-
Notifications
You must be signed in to change notification settings - Fork 434
Open
Description
I'm not sure if the root cause is with asyncpg or with temporal but this code works fine on 0.30.0, and crashes on 0.31.0 in await asyncpg.connect. Python 3.12.9.
#!/usr/bin/env python
import asyncio
import threading
from datetime import datetime, timedelta
import asyncpg
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
TASK_QUEUE = "asyncpg-temporal-test"
async def db_call(label: str) -> None:
"""Connect to Postgres and close."""
loop = asyncio.get_running_loop()
print(f"[{label}] thread={threading.get_ident()} loop_id={id(loop)} {loop!r}")
conn = await asyncpg.connect(
host="localhost",
port=5432,
database="database",
user="andrei",
)
print(f"[{label}] connected to Postgres")
await conn.close()
@activity.defn
async def db_activity() -> None:
return await db_call("activity")
@workflow.defn
class TestWorkflow:
@workflow.run
async def run(self) -> None:
print("[workflow] starting db_activity")
return await workflow.execute_activity(
db_activity,
start_to_close_timeout=timedelta(seconds=30),
task_queue=TASK_QUEUE,
)
async def main() -> None:
# 1) Baseline: asyncpg on a fresh loop, no Temporal
print("=== 1) Direct asyncpg on fresh loop (no Temporal) ===")
await db_call("direct")
# 2) Bring up Temporal client + worker and run a workflow that calls db_activity
print("\n=== 2) Temporal worker + workflow calling async activity ===")
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue=TASK_QUEUE,
workflows=[TestWorkflow],
activities=[db_activity],
)
async with worker:
# This executes TestWorkflow.run, which in turn executes db_activity
result = await client.execute_workflow(
TestWorkflow.run,
id=datetime.now().isoformat(),
task_queue=TASK_QUEUE,
)
print(f"[main] workflow result: {result}")
if __name__ == "__main__":
asyncio.run(main())Repro steps:
- Start a temporal server:
temporal server start-dev - In a separate terminal, run the code above:
% python test_asyncpg.py
=== 1) Direct asyncpg on fresh loop (no Temporal) ===
[direct] thread=8654430720 loop_id=4353195520 <_UnixSelectorEventLoop running=True closed=False debug=False>
[direct] connected to Postgres
=== 2) Temporal worker + workflow calling async activity ===
[workflow] starting db_activity
[activity] thread=8654430720 loop_id=4353195520 <_UnixSelectorEventLoop running=True closed=False debug=False>
zsh: segmentation fault python test_asyncpg.pyMetadata
Metadata
Assignees
Labels
No labels