Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle async cancelled error explicitly #811

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
AsyncEvent,
AsyncShieldCancellation,
AsyncThreadLock,
)
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

Expand Down Expand Up @@ -205,7 +210,7 @@ async def handle_async_request(self, request: Request) -> Response:
else:
break # pragma: nocover

except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with self._optional_thread_lock:
# For any exception or cancellation we remove the request from
# the queue, and then re-assign requests to connections.
Expand Down
10 changes: 7 additions & 3 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
map_exceptions,
)
from .._models import Origin, Request, Response
from .._synchronization import AsyncLock, AsyncShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
AsyncLock,
AsyncShieldCancellation,
)
from .._trace import Trace
from .interfaces import AsyncConnectionInterface

Expand Down Expand Up @@ -136,7 +140,7 @@ async def handle_async_request(self, request: Request) -> Response:
"network_stream": network_stream,
},
)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with AsyncShieldCancellation():
async with Trace("response_closed", logger, request) as trace:
await self._response_closed()
Expand Down Expand Up @@ -340,7 +344,7 @@ async def __aiter__(self) -> AsyncIterator[bytes]:
async with Trace("receive_response_body", logger, self._request, kwargs):
async for chunk in self._connection._receive_response_body(**kwargs):
yield chunk
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
Expand Down
13 changes: 9 additions & 4 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
RemoteProtocolError,
)
from .._models import Origin, Request, Response
from .._synchronization import AsyncLock, AsyncSemaphore, AsyncShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
AsyncLock,
AsyncSemaphore,
AsyncShieldCancellation,
)
from .._trace import Trace
from .interfaces import AsyncConnectionInterface

Expand Down Expand Up @@ -107,7 +112,7 @@ async def handle_async_request(self, request: Request) -> Response:
kwargs = {"request": request}
async with Trace("send_connection_init", logger, request, kwargs):
await self._send_connection_init(**kwargs)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with AsyncShieldCancellation():
await self.aclose()
raise exc
Expand Down Expand Up @@ -160,7 +165,7 @@ async def handle_async_request(self, request: Request) -> Response:
"stream_id": stream_id,
},
)
except BaseException as exc: # noqa: PIE786
except EXCEPTION_OR_CANCELLED as exc: # noqa: PIE786
with AsyncShieldCancellation():
kwargs = {"stream_id": stream_id}
async with Trace("response_closed", logger, request, kwargs):
Expand Down Expand Up @@ -573,7 +578,7 @@ async def __aiter__(self) -> typing.AsyncIterator[bytes]:
request=self._request, stream_id=self._stream_id
):
yield chunk
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
Expand Down
9 changes: 7 additions & 2 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, ShieldCancellation, ThreadLock
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
Event,
ShieldCancellation,
ThreadLock,
)
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

Expand Down Expand Up @@ -205,7 +210,7 @@ def handle_request(self, request: Request) -> Response:
else:
break # pragma: nocover

except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with self._optional_thread_lock:
# For any exception or cancellation we remove the request from
# the queue, and then re-assign requests to connections.
Expand Down
10 changes: 7 additions & 3 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
map_exceptions,
)
from .._models import Origin, Request, Response
from .._synchronization import Lock, ShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
Lock,
ShieldCancellation,
)
from .._trace import Trace
from .interfaces import ConnectionInterface

Expand Down Expand Up @@ -136,7 +140,7 @@ def handle_request(self, request: Request) -> Response:
"network_stream": network_stream,
},
)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with ShieldCancellation():
with Trace("response_closed", logger, request) as trace:
self._response_closed()
Expand Down Expand Up @@ -340,7 +344,7 @@ def __iter__(self) -> Iterator[bytes]:
with Trace("receive_response_body", logger, self._request, kwargs):
for chunk in self._connection._receive_response_body(**kwargs):
yield chunk
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
Expand Down
13 changes: 9 additions & 4 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
RemoteProtocolError,
)
from .._models import Origin, Request, Response
from .._synchronization import Lock, Semaphore, ShieldCancellation
from .._synchronization import (
EXCEPTION_OR_CANCELLED,
Lock,
Semaphore,
ShieldCancellation,
)
from .._trace import Trace
from .interfaces import ConnectionInterface

Expand Down Expand Up @@ -107,7 +112,7 @@ def handle_request(self, request: Request) -> Response:
kwargs = {"request": request}
with Trace("send_connection_init", logger, request, kwargs):
self._send_connection_init(**kwargs)
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
with ShieldCancellation():
self.close()
raise exc
Expand Down Expand Up @@ -160,7 +165,7 @@ def handle_request(self, request: Request) -> Response:
"stream_id": stream_id,
},
)
except BaseException as exc: # noqa: PIE786
except EXCEPTION_OR_CANCELLED as exc: # noqa: PIE786
with ShieldCancellation():
kwargs = {"stream_id": stream_id}
with Trace("response_closed", logger, request, kwargs):
Expand Down Expand Up @@ -573,7 +578,7 @@ def __iter__(self) -> typing.Iterator[bytes]:
request=self._request, stream_id=self._stream_id
):
yield chunk
except BaseException as exc:
except EXCEPTION_OR_CANCELLED as exc:
# If we get an exception while streaming the response,
# we want to close the response (and possibly the connection)
# before raising that exception.
Expand Down
13 changes: 12 additions & 1 deletion httpcore/_synchronization.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
import threading
from types import TracebackType
from typing import Optional, Type
from typing import Optional, Tuple, Type

from ._exceptions import ExceptionMapping, PoolTimeout, map_exceptions

EXCEPTION_OR_CANCELLED: Tuple[Type[BaseException], ...] = (Exception,)

# Our async synchronization primatives use either 'anyio' or 'trio' depending
# on if they're running under asyncio or trio.

try:
import trio

EXCEPTION_OR_CANCELLED += (trio.Cancelled,)
except ImportError: # pragma: nocover
trio = None # type: ignore

try:
import anyio

try:
import asyncio

EXCEPTION_OR_CANCELLED += (asyncio.CancelledError,)
except ImportError: # pragma: nocover
pass
except ImportError: # pragma: nocover
anyio = None # type: ignore

Expand Down
Loading