-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix 'I/O operation on closed file' and 'Form data has been processed already' upon redirect on multipart data #9201
base: master
Are you sure you want to change the base?
Changes from all commits
be0970b
542d4bc
dd34bcb
fb2d3b8
4020c14
32e127d
f3e62c3
85fec6c
b75761c
320e508
f706927
c4da788
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix `I/O operation on closed file` and `Form data has been processed already` upon redirect on multipart data -- by :user:`GLGDLY`. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -307,17 +307,36 @@ def __init__( | |
if hdrs.CONTENT_DISPOSITION not in self.headers: | ||
self.set_content_disposition(disposition, filename=self._filename) | ||
|
||
self._writable = True | ||
|
||
try: | ||
self._seekable = self._value.seekable() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
except AttributeError: # https://github.com/python/cpython/issues/124293 | ||
self._seekable = False | ||
|
||
if self._seekable: | ||
self._stream_pos = self._value.tell() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: check if tell can be blocking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure |
||
else: | ||
self._stream_pos = 0 | ||
|
||
async def write(self, writer: AbstractStreamWriter) -> None: | ||
loop = asyncio.get_event_loop() | ||
try: | ||
if self._seekable: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: see if executor jobs can be combined There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be combined into a single executor job. Example diff --git a/aiohttp/payload.py b/aiohttp/payload.py
index 395c44f6e..3b222b93e 100644
--- a/aiohttp/payload.py
+++ b/aiohttp/payload.py
@@ -319,15 +319,19 @@ class IOBasePayload(Payload):
else:
self._stream_pos = 0
- async def write(self, writer: AbstractStreamWriter) -> None:
- loop = asyncio.get_event_loop()
+ def _read_first(self) -> None:
+ """Read the first chunk of data from the stream."""
if self._seekable:
- await loop.run_in_executor(None, self._value.seek, self._stream_pos)
+ self._value.seek(self._stream_pos)
elif not self._writable:
raise RuntimeError(
f'Non-seekable IO payload "{self._value}" is already consumed (possibly due to redirect, consider storing in a seekable IO buffer instead)'
)
- chunk = await loop.run_in_executor(None, self._value.read, 2**16)
+ return self._value.read(2**16)
+
+ async def write(self, writer: AbstractStreamWriter) -> None:
+ loop = asyncio.get_running_loop()
+ chunk = await loop.run_in_executor(None, self._read_first)
while chunk:
await writer.write(chunk)
chunk = await loop.run_in_executor(None, self._value.read, 2**16) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to seek here if it's the first time? That seems like it will be the common case |
||
await loop.run_in_executor(None, self._value.seek, self._stream_pos) | ||
elif not self._writable: | ||
raise RuntimeError( | ||
f'Non-seekable IO payload "{self._value}" is already consumed (possibly due to redirect, consider storing in a seekable IO buffer instead)' | ||
) | ||
chunk = await loop.run_in_executor(None, self._value.read, 2**16) | ||
while chunk: | ||
await writer.write(chunk) | ||
chunk = await loop.run_in_executor(None, self._value.read, 2**16) | ||
while chunk: | ||
await writer.write(chunk) | ||
chunk = await loop.run_in_executor(None, self._value.read, 2**16) | ||
finally: | ||
await loop.run_in_executor(None, self._value.close) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: verify close will still always happen |
||
if not self._seekable: | ||
self._writable = False # Non-seekable IO `_value` can only be consumed once | ||
|
||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | ||
if self._seekable: | ||
self._value.seek(self._stream_pos) | ||
return "".join(r.decode(encoding, errors) for r in self._value.readlines()) | ||
|
||
|
||
|
@@ -354,40 +373,50 @@ def __init__( | |
@property | ||
def size(self) -> Optional[int]: | ||
try: | ||
return os.fstat(self._value.fileno()).st_size - self._value.tell() | ||
return os.fstat(self._value.fileno()).st_size - self._stream_pos | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: verify this doesn't run in the event loop There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like multipart will call this in the event loop from |
||
except OSError: | ||
return None | ||
|
||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | ||
if self._seekable: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to seek on the first time? |
||
self._value.seek(self._stream_pos) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: verify this isn't called in the event loop as it does block |
||
return self._value.read() | ||
|
||
async def write(self, writer: AbstractStreamWriter) -> None: | ||
loop = asyncio.get_event_loop() | ||
try: | ||
if self._seekable: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to seek here if it's the first time? |
||
await loop.run_in_executor(None, self._value.seek, self._stream_pos) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: see if executor jobs can be combined There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Jobs can be combined like #9201 (comment) |
||
elif not self._writable: | ||
raise RuntimeError( | ||
f'Non-seekable IO payload "{self._value}" is already consumed (possibly due to redirect, consider storing in a seekable IO buffer instead)' | ||
) | ||
chunk = await loop.run_in_executor(None, self._value.read, 2**16) | ||
while chunk: | ||
data = ( | ||
chunk.encode(encoding=self._encoding) | ||
if self._encoding | ||
else chunk.encode() | ||
) | ||
await writer.write(data) | ||
chunk = await loop.run_in_executor(None, self._value.read, 2**16) | ||
while chunk: | ||
data = ( | ||
chunk.encode(encoding=self._encoding) | ||
if self._encoding | ||
else chunk.encode() | ||
) | ||
await writer.write(data) | ||
chunk = await loop.run_in_executor(None, self._value.read, 2**16) | ||
finally: | ||
await loop.run_in_executor(None, self._value.close) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: verify close still happens in failure |
||
if not self._seekable: | ||
self._writable = False # Non-seekable IO `_value` can only be consumed once | ||
|
||
|
||
class BytesIOPayload(IOBasePayload): | ||
_value: io.BytesIO | ||
|
||
@property | ||
def size(self) -> int: | ||
position = self._value.tell() | ||
end = self._value.seek(0, os.SEEK_END) | ||
self._value.seek(position) | ||
return end - position | ||
def size(self) -> Optional[int]: | ||
if self._seekable: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: verify this doesn't run in the event loop There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
end = self._value.seek(0, os.SEEK_END) | ||
self._value.seek(self._stream_pos) | ||
return end - self._stream_pos | ||
return None | ||
|
||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | ||
if self._seekable: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: make sure this is run in the executor |
||
self._value.seek(self._stream_pos) | ||
return self._value.read().decode(encoding, errors) | ||
|
||
|
||
|
@@ -397,7 +426,7 @@ class BufferedReaderPayload(IOBasePayload): | |
@property | ||
def size(self) -> Optional[int]: | ||
try: | ||
return os.fstat(self._value.fileno()).st_size - self._value.tell() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: make sure this is run in the executor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
return os.fstat(self._value.fileno()).st_size - self._stream_pos | ||
except (OSError, AttributeError): | ||
# data.fileno() is not supported, e.g. | ||
# io.BufferedReader(io.BytesIO(b'data')) | ||
|
@@ -406,6 +435,8 @@ def size(self) -> Optional[int]: | |
return None | ||
|
||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | ||
if self._seekable: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: verify this does not run in the event loop |
||
self._value.seek(self._stream_pos) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to happen on the first attempt? |
||
return self._value.read().decode(encoding, errors) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: benchmark this in case the non exceptional case is to raise an exception