-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix 'I/O operation on closed file' and 'Form data has been processed already' upon redirect on multipart data #9201
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #9201 +/- ##
==========================================
+ Coverage 98.31% 98.32% +0.01%
==========================================
Files 107 107
Lines 34510 34765 +255
Branches 4100 4137 +37
==========================================
+ Hits 33929 34184 +255
Misses 410 410
Partials 171 171
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
At first glance, it looks like some reasonable changes for an awkward situation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is good, I'd like @bdraco to have a quick check as well though.
I'm on the way back home from Europe and will take a look this weekend if I'm not too jet lagged. Otherwise I'll take a look when the week starts |
async def write(self, writer: AbstractStreamWriter) -> None: | ||
loop = asyncio.get_event_loop() | ||
try: | ||
if self._seekable: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: see if executor jobs can be combined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be combined into a single executor job. Example
diff --git a/aiohttp/payload.py b/aiohttp/payload.py
index 395c44f6e..3b222b93e 100644
--- a/aiohttp/payload.py
+++ b/aiohttp/payload.py
@@ -319,15 +319,19 @@ class IOBasePayload(Payload):
else:
self._stream_pos = 0
- async def write(self, writer: AbstractStreamWriter) -> None:
- loop = asyncio.get_event_loop()
+ def _read_first(self) -> None:
+ """Read the first chunk of data from the stream."""
if self._seekable:
- await loop.run_in_executor(None, self._value.seek, self._stream_pos)
+ self._value.seek(self._stream_pos)
elif not self._writable:
raise RuntimeError(
f'Non-seekable IO payload "{self._value}" is already consumed (possibly due to redirect, consider storing in a seekable IO buffer instead)'
)
- chunk = await loop.run_in_executor(None, self._value.read, 2**16)
+ return self._value.read(2**16)
+
+ async def write(self, writer: AbstractStreamWriter) -> None:
+ loop = asyncio.get_running_loop()
+ chunk = await loop.run_in_executor(None, self._read_first)
while chunk:
await writer.write(chunk)
chunk = await loop.run_in_executor(None, self._value.read, 2**16)
@@ -354,40 +373,50 @@ def __init__( | |||
@property | |||
def size(self) -> Optional[int]: | |||
try: | |||
return os.fstat(self._value.fileno()).st_size - self._value.tell() | |||
return os.fstat(self._value.fileno()).st_size - self._stream_pos |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: verify this doesn't run in the event loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like multipart will call this in the event loop from append_payload
via ClientRequest.update_body_from_data
return self._value.read() | ||
|
||
async def write(self, writer: AbstractStreamWriter) -> None: | ||
loop = asyncio.get_event_loop() | ||
try: | ||
if self._seekable: | ||
await loop.run_in_executor(None, self._value.seek, self._stream_pos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: see if executor jobs can be combined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jobs can be combined like #9201 (comment)
self._value.seek(position) | ||
return end - position | ||
def size(self) -> Optional[int]: | ||
if self._seekable: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: verify this doesn't run in the event loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | ||
if self._seekable: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: make sure this is run in the executor
except OSError: | ||
return None | ||
|
||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | ||
if self._seekable: | ||
self._value.seek(self._stream_pos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: verify this isn't called in the event loop as it does block
except OSError: | ||
return None | ||
|
||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | ||
if self._seekable: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to seek on the first time?
await writer.write(data) | ||
chunk = await loop.run_in_executor(None, self._value.read, 2**16) | ||
finally: | ||
await loop.run_in_executor(None, self._value.close) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: verify close still happens in failure
@@ -406,6 +435,8 @@ def size(self) -> Optional[int]: | |||
return None | |||
|
|||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | |||
if self._seekable: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: verify this does not run in the event loop
@@ -406,6 +435,8 @@ def size(self) -> Optional[int]: | |||
return None | |||
|
|||
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: | |||
if self._seekable: | |||
self._value.seek(self._stream_pos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to happen on the first attempt?
self._writable = True | ||
|
||
try: | ||
self._seekable = self._value.seekable() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think seekable
can be blocking , at least aiofiles
delegates it the executor https://pypi.org/project/aiofiles/
It looks like there are a few places where blocking I/O is happening in the event loop that need to be addressed before we can move this forward |
after the discussion with cpython community, I think |
What do these changes do?
Fix 'I/O operation on closed file' and 'Form data has been processed already' upon redirect on multipart data, based on the discussion in #6853 .
This approach tries to pre-build Payload object for the data passing into the request, before the redirect
while True
loop starts, so we can reuse the same Payload object for the entire redirect chain. However, it is notable that I/O object is always an issue, so my thought here is to use theseek
operation on the I/O object to allow chunk writing on redirect requests.Yet, for non-seekable I/O objects, some possible solution are:
I think more discussion might be needed for non-seekable objects, so I just raise error in this PR first.
Another thing I think worth discussion is that I removed the
close()
operation on the I/O object in this PR due to the following reasons:StringIOPayload
do notclose
its I/O value in the original code,But I do think more discussions might be needed here.
Are there changes in behavior for the user?
No.
Is it a substantial burden for the maintainers to support this?
Yes.
Related issue number
Fixes #5577
Fixes #5530
Checklist
CONTRIBUTORS.txt
CHANGES/
foldername it
<issue_or_pr_num>.<type>.rst
(e.g.588.bugfix.rst
)if you don't have an issue number, change it to the pull request
number after creating the PR
.bugfix
: A bug fix for something the maintainers deemed animproper undesired behavior that got corrected to match
pre-agreed expectations.
.feature
: A new behavior, public APIs. That sort of stuff..deprecation
: A declaration of future API removals and breakingchanges in behavior.
.breaking
: When something public is removed in a breaking way.Could be deprecated in an earlier release.
.doc
: Notable updates to the documentation structure or buildprocess.
.packaging
: Notes for downstreams about unobvious side effectsand tooling. Changes in the test invocation considerations and
runtime assumptions.
.contrib
: Stuff that affects the contributor experience. e.g.Running tests, building the docs, setting up the development
environment.
.misc
: Changes that are hard to assign to any of the abovecategories.
Make sure to use full sentences with correct case and punctuation,
for example:
Use the past tense or the present tense a non-imperative mood,
referring to what's changed compared to the last released version
of this project.