Skip to content

Fix: remove I/O Blocking code on message processing parts #819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
40c687a
Fix: create_async_engine does not handle same way application_name as…
1yam Jun 23, 2025
d33fedb
Refactor: Db Accessors to use AsyncDB instead of Sync
1yam Jul 7, 2025
46ba12a
Refactor: web/controllers to use AsyncDB instead of Sync
1yam Jul 7, 2025
ae2cc5b
Refactor: db vm accessor to use AsyncDB instead of Sync
1yam Jul 7, 2025
a9f0b71
Refactor: jobs to use AsyncDB instead of Sync
1yam Jul 7, 2025
27f704f
Refactor: services to use AsyncDB instead of Sync
1yam Jul 7, 2025
1157182
Refactor: handler to use AsyncDB instead of Sync
1yam Jul 7, 2025
65bf4aa
Refactor: AugmentedBase bd models to use async DB instead of sync
1yam Jul 7, 2025
ffed9d1
Refactor: chains to use async DB instead of sync
1yam Jul 7, 2025
06030dc
Refactor: use async sessions factory instead of sync
1yam Jul 7, 2025
32724f0
Refactor: use async sessions factory instead of sync
1yam Jul 7, 2025
947cca2
fix: lint issue
1yam Jul 7, 2025
9e90632
Refactor: storage.py need now use AsyncDbSession
1yam Jul 7, 2025
7ab8ae8
Refactor: unit test to use AsyncDb instead of Sync
1yam Jul 7, 2025
92484bb
fix: remove debug print
1yam Jul 8, 2025
1cf2918
Fix: merge_aggregate_elements is I/O blocking, we use async.to_thread…
1yam Jul 8, 2025
d0a76ea
fix: filesystem_engine to use aiofiles to avoid I/O Blocking
1yam Jul 8, 2025
60151a2
fix: typo error
1yam Jul 8, 2025
c3e1530
Refactor: request hash to peer in a more efficient way
1yam Jul 8, 2025
6769310
Refactor: request peer info for more efficient way
1yam Jul 8, 2025
2f2d36f
fix: lint issue
1yam Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/aleph/api_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import aleph.config
from aleph.chains.signature_verifier import SignatureVerifier
from aleph.db.connection import make_engine, make_session_factory
from aleph.db.connection import make_async_engine, make_async_session_factory
from aleph.services.cache.node_cache import NodeCache
from aleph.services.ipfs import IpfsService
from aleph.services.p2p import init_p2p_client
Expand All @@ -34,12 +34,12 @@ async def configure_aiohttp_app(
with sentry_sdk.start_transaction(name="init-api-server"):
p2p_client = await init_p2p_client(config, service_name="api-server-aiohttp")

engine = make_engine(
engine = make_async_engine(
config,
echo=config.logging.level.value == logging.DEBUG,
application_name="aleph-api",
)
session_factory = make_session_factory(engine)
session_factory = make_async_session_factory(engine)

node_cache = NodeCache(
redis_host=config.redis.host.value, redis_port=config.redis.port.value
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/bsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from aleph.chains.chain_data_service import PendingTxPublisher
from aleph.chains.indexer_reader import AlephIndexerReader
from aleph.types.chain_sync import ChainEventType
from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory


class BscConnector(ChainReader):
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
):
self.indexer_reader = AlephIndexerReader(
Expand Down
26 changes: 13 additions & 13 deletions src/aleph/chains/chain_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@
from aleph.storage import StorageService
from aleph.toolkit.timestamp import utc_now
from aleph.types.chain_sync import ChainSyncProtocol
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.db_session import AsyncDbSession, AsyncDbSessionFactory
from aleph.types.files import FileType
from aleph.utils import get_sha256


class ChainDataService:
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
storage_service: StorageService,
):
self.session_factory = session_factory
self.storage_service = storage_service

async def prepare_sync_event_payload(
self, session: DbSession, messages: List[MessageDb]
self, session: AsyncDbSession, messages: List[MessageDb]
) -> OffChainSyncEventPayload:
"""
Returns the payload of a sync event to be published on chain.
Expand Down Expand Up @@ -129,22 +129,22 @@ async def _get_tx_messages_off_chain_protocol(
LOGGER.info("Got bulk data with %d items" % len(messages))
if config.ipfs.enabled.value:
try:
with self.session_factory() as session:
async with self.session_factory() as session:
# Some chain data files are duplicated, and can be treated in parallel,
# hence the upsert.
upsert_file(
await upsert_file(
session=session,
file_hash=sync_file_content.hash,
file_type=FileType.FILE,
size=len(sync_file_content.raw_value),
)
upsert_tx_file_pin(
await upsert_tx_file_pin(
session=session,
file_hash=file_hash,
tx_hash=tx.hash,
created=utc_now(),
)
session.commit()
await session.commit()

# Some IPFS fetches can take a while, hence the large timeout.
await asyncio.wait_for(
Expand Down Expand Up @@ -246,17 +246,17 @@ def __init__(self, pending_tx_exchange: aio_pika.abc.AbstractExchange):
self.pending_tx_exchange = pending_tx_exchange

@staticmethod
def add_pending_tx(session: DbSession, tx: ChainTxDb):
upsert_chain_tx(session=session, tx=tx)
upsert_pending_tx(session=session, tx_hash=tx.hash)
async def add_pending_tx(session: AsyncDbSession, tx: ChainTxDb):
await upsert_chain_tx(session=session, tx=tx)
await upsert_pending_tx(session=session, tx_hash=tx.hash)

async def publish_pending_tx(self, tx: ChainTxDb):
message = aio_pika.Message(body=tx.hash.encode("utf-8"))
await self.pending_tx_exchange.publish(
message=message, routing_key=f"{tx.chain.value}.{tx.publisher}.{tx.hash}"
)

async def add_and_publish_pending_tx(self, session: DbSession, tx: ChainTxDb):
async def add_and_publish_pending_tx(self, session: AsyncDbSession, tx: ChainTxDb):
"""
Add an event published on one of the supported chains.
Adds the tx to the database, creates a pending tx entry in the pending tx table
Expand All @@ -265,8 +265,8 @@ async def add_and_publish_pending_tx(self, session: DbSession, tx: ChainTxDb):
Note that this function commits changes to the database for consistency
between the DB and the message queue.
"""
self.add_pending_tx(session=session, tx=tx)
session.commit()
await self.add_pending_tx(session=session, tx=tx)
await session.commit()
await self.publish_pending_tx(tx)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/chains/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from aleph_message.models import Chain
from configmanager import Config

from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory

from .abc import ChainReader, ChainWriter
from .bsc import BscConnector
Expand All @@ -29,7 +29,7 @@ class ChainConnector:

def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
Expand Down
28 changes: 14 additions & 14 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from aleph.schemas.chains.tx_context import TxContext
from aleph.toolkit.timestamp import utc_now
from aleph.types.chain_sync import ChainEventType
from aleph.types.db_session import DbSessionFactory
from aleph.types.db_session import AsyncDbSessionFactory
from aleph.utils import run_in_executor

from .abc import ChainWriter
Expand Down Expand Up @@ -77,7 +77,7 @@ class EthereumVerifier(EVMVerifier):
class EthereumConnector(ChainWriter):
def __init__(
self,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
chain_data_service: ChainDataService,
):
Expand All @@ -93,8 +93,8 @@ def __init__(

async def get_last_height(self, sync_type: ChainEventType) -> int:
"""Returns the last height for which we already have the ethereum data."""
with self.session_factory() as session:
last_height = get_last_height(
async with self.session_factory() as session:
last_height = await get_last_height(
session=session, chain=Chain.ETH, sync_type=sync_type
)

Expand Down Expand Up @@ -209,15 +209,15 @@ async def _request_transactions(
# block height to do next requests from there.
last_height = event_data.blockNumber
if last_height:
with self.session_factory() as session:
upsert_chain_sync_status(
async with self.session_factory() as session:
await upsert_chain_sync_status(
session=session,
chain=Chain.ETH,
sync_type=ChainEventType.SYNC,
height=last_height,
update_datetime=utc_now(),
)
session.commit()
await session.commit()

async def fetch_ethereum_sync_events(self, config: Config):
last_stored_height = await self.get_last_height(sync_type=ChainEventType.SYNC)
Expand All @@ -236,11 +236,11 @@ async def fetch_ethereum_sync_events(self, config: Config):
config, web3, contract, abi, last_stored_height
):
tx = ChainTxDb.from_sync_tx_context(tx_context=context, tx_data=jdata)
with self.session_factory() as session:
async with self.session_factory() as session:
await self.pending_tx_publisher.add_and_publish_pending_tx(
session=session, tx=tx
)
session.commit()
await session.commit()

async def fetch_sync_events_task(self, config: Config):
while True:
Expand Down Expand Up @@ -295,10 +295,10 @@ async def packer(self, config: Config):
i = 0
gas_price = web3.eth.generate_gas_price()
while True:
with self.session_factory() as session:
async with self.session_factory() as session:
# Wait for sync operations to complete
if (count_pending_txs(session=session, chain=Chain.ETH)) or (
count_pending_messages(session=session, chain=Chain.ETH)
if (await count_pending_txs(session=session, chain=Chain.ETH)) or (
await count_pending_messages(session=session, chain=Chain.ETH)
) > 1000:
await asyncio.sleep(30)
continue
Expand All @@ -317,7 +317,7 @@ async def packer(self, config: Config):
nonce = web3.eth.get_transaction_count(account.address)

messages = list(
get_unconfirmed_messages(
await get_unconfirmed_messages(
session=session, limit=10000, chain=Chain.ETH
)
)
Expand All @@ -332,7 +332,7 @@ async def packer(self, config: Config):
)
)
# Required to apply update to the files table in get_chaindata
session.commit()
await session.commit()
response = await run_in_executor(
None,
self._broadcast_content,
Expand Down
22 changes: 12 additions & 10 deletions src/aleph/chains/indexer_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from aleph.toolkit.range import MultiRange, Range
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.chain_sync import ChainEventType, ChainSyncProtocol
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.db_session import AsyncDbSession, AsyncDbSessionFactory

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -246,7 +246,7 @@ class AlephIndexerReader:
def __init__(
self,
chain: Chain,
session_factory: DbSessionFactory,
session_factory: AsyncDbSessionFactory,
pending_tx_publisher: PendingTxPublisher,
):
self.chain = chain
Expand All @@ -257,7 +257,7 @@ def __init__(

async def fetch_range(
self,
session: DbSession,
session: AsyncDbSession,
indexer_client: AlephIndexerClient,
chain: Chain,
event_type: ChainEventType,
Expand Down Expand Up @@ -295,7 +295,9 @@ async def fetch_range(
LOGGER.info("%d new txs", len(txs))
# Events are listed in reverse order in the indexer response
for tx in txs:
self.pending_tx_publisher.add_pending_tx(session=session, tx=tx)
await self.pending_tx_publisher.add_pending_tx(
session=session, tx=tx
)

if nb_events_fetched >= limit:
last_event_datetime = txs[-1].datetime
Expand All @@ -320,7 +322,7 @@ async def fetch_range(
str(synced_range),
)

add_indexer_range(
await add_indexer_range(
session=session,
chain=chain,
event_type=event_type,
Expand All @@ -329,7 +331,7 @@ async def fetch_range(

# Committing periodically reduces the size of DB transactions for large numbers
# of events.
session.commit()
await session.commit()

# Now that the txs are committed to the DB, add them to the pending tx message queue
for tx in txs:
Expand All @@ -347,7 +349,7 @@ async def fetch_range(

async def fetch_new_events(
self,
session: DbSession,
session: AsyncDbSession,
indexer_url: str,
smart_contract_address: str,
event_type: ChainEventType,
Expand All @@ -372,7 +374,7 @@ async def fetch_new_events(
]
)

multirange_to_sync = get_missing_indexer_datetime_multirange(
multirange_to_sync = await get_missing_indexer_datetime_multirange(
session=session,
chain=self.chain,
event_type=event_type,
Expand All @@ -399,14 +401,14 @@ async def fetcher(
):
while True:
try:
with self.session_factory() as session:
async with self.session_factory() as session:
await self.fetch_new_events(
session=session,
indexer_url=indexer_url,
smart_contract_address=smart_contract_address,
event_type=event_type,
)
session.commit()
await session.commit()
except Exception:
LOGGER.exception(
"An unexpected exception occurred, "
Expand Down
Loading
Loading