-
-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Context managers in Depends
are broken after 0.106
#11143
Comments
I commented in the Discussion that |
No. The only reason I created this issue is because of your comment. 🙏 |
Does this comment by @tiangolo answers this issue: #11177 (reply in thread) ? |
I might be misinterpreting but I don't think so? Seems like it was intended to disable using resources in background tasks. But was it intended to also disable using resources in path operators with |
It seems this is what breaks our streaming endpoints, the lack of which kills throughput making everything slow. |
A workaround for now is to use our connection pool directly in each endpoint doing streaming. Annoying, but works. @router.get("/stuff", response_model=list[Stuff])
async def get_all_stuff(pool: PoolDep):
connection = await pool.getconn()
async def commit_and_put_in_pool():
await connection.commit()
await pool.putconn(connection)
return StreamingResponse(
stream_stuff(connection), media_type="application/json", background=commit_and_put_in_pool
) This seems to work for us. But is it correct? Will the background task always run after the streaming is done? Or can we end up in a situation where it is run in the middle of streaming? |
Our streaming endpoints are broken probably due to this. |
The release notes suggest the following:
i.e. just don't use |
@scriptator we're not talking about background tasks here. |
I was replying to @brian-goo who talked about streaming responses which behave like background tasks in this regard. I had a problem because I used a db session in a streaming response that I obtained with Depends and it was fixed by following the advice for background tasks: initiate your resources within the background task (or generator function in the case of streaming responses). But you are right if you meant that this does not contribute to the original problem from the issue. |
I don't think streaming responses should be considered similar to background tasks. They are part of the request/response cycle. I think their behavior in this case being similar to background tasks not being able to use dependency injection in the path operators is likely a bug. |
Any progress on this? Our workaround is not working as expected. It seems the background task is not guaranteed to run, and thus our connection pool is failing when ever there is any error, essentially taking down the entire program, and we must restart. This basically renders streaming useless for us. Is there something else we can do to make it work? |
For now, just staying on version |
Does |
+1 for this issue. It breaks request context management for stream proxies. ie I have a The work around in my case is to use a global client for the external streaming API so that cleanup is not called on the return of |
@btemplep could you provide a code example for your workaround? |
We also encountered issues with streaming response after upgrading FastAPI to > 0.106. We implemented Git service in HTTP protocol. During the time streaming response is generating content to the client, it needs to access to database session and other services provided bye Is it possible to delay the destruction of dependencies only after the HTTP session actually closed (no more content will be sent to the client)? Or make it a special case if the returned response is a streaming response? |
@libe Here is an example of what I expected to work from typing import Any, AsyncGenerator, Annotated
import aioboto3
from fastapi import Depends, FastAPI
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
engine = create_async_engine("sqlite+aiosqlite:///test.sqlite")
sessionmaker: async_sessionmaker[AsyncSession] = async_sessionmaker(expire_on_commit=False)
class Base(AsyncAttrs, DeclarativeBase):
pass
class SomeTable(Base):
__tablename__ = "some_table"
some_id: Mapped[int] = mapped_column(primary_key=True, nullable=False)
async def get_db() -> AsyncGenerator[AsyncSession, None, None]:
async with sessionmaker() as sess:
yield sess
async def get_boto_client():
async with aioboto3.Session().client("bedrock-runtime") as br_client:
yield br_client
app = FastAPI()
async def _stream_resp(db_sess: AsyncSession, stream_resp: dict):
async for chunk in stream_resp['body']:
# also do something with the db_session
yield chunk
@app.route("/route_here")
async def route_here(
db_sess: Annotated[AsyncSession, Depends(get_db)],
br_client: Annotated[Any, Depends(get_boto_client)]
):
stream_resp = br_client.invoke_model_with_response_stream(args="here")
return StreamingResponse(
_stream_resp(
db_sess=db_sess,
stream_resp=stream_resp
)
) Here is a simplified version of what my fix is. from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Annotated
import aioboto3
from fastapi import Depends, FastAPI
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
engine = create_async_engine("sqlite+aiosqlite:///test.sqlite")
sessionmaker: async_sessionmaker[AsyncSession] = async_sessionmaker(expire_on_commit=False)
br_client = None
class Base(AsyncAttrs, DeclarativeBase):
pass
class SomeTable(Base):
__tablename__ = "some_table"
some_id: Mapped[int] = mapped_column(primary_key=True, nullable=False)
async def get_db() -> AsyncGenerator[AsyncSession, None, None]:
async with sessionmaker() as sess:
yield sess
@asynccontextmanager
async def lifespan(app: FastAPI):
global br_client
async with aioboto3.Session().client("bedrock-runtime") as br:
br_client = br
yield
app = FastAPI(
lifespan=lifespan
)
async def _stream_resp(stream_resp: dict):
# In the streaming generator we need to start our own context manager for DB
async with sessionmaker() as db_sess:
# Because the client is global the stream on the backend is preserved.
async for chunk in stream_resp['body']:
# also do something with the db_session
yield chunk
@app.route("/route_here")
async def route_here(
db_sess: Annotated[AsyncSession, Depends(get_db)]
):
# br_client is now global, and shared
stream_resp = br_client.invoke_model_with_response_stream(args="here")
# do DB stuff here, we can still use the Depends
return StreamingResponse(
_stream_resp(
db_sess=db_sess,
stream_resp=stream_resp
)
) |
Discussed in #11107
Originally posted by FeeeeK February 7, 2024
First Check
Commit to Help
Example Code
Description
Before
0.106
, Depends execution after yield was after middlewares, which allowed to access resources created for a route (e.g. sessions) and do something with them depending on the response (which cannot be done with Depends), but after 0.106, the behavior has changed and this feature is no longer available. The documentation only talks about background tasks, but not a word about middlewares. Was this behavior change intentional?Operating System
Windows
Operating System Details
No response
FastAPI Version
0.109.2
Pydantic Version
2.4.2
Python Version
3.11.1
Additional Context
No response
The text was updated successfully, but these errors were encountered: