Skip to content

Commit 60af644

Browse files
authored
Fix streaming download performance issue (#224)
2.12.915 (2024-04-02) ===================== - Fixed a performance issue when streaming download by chunk (size) not in phase with incoming packets size. See jawah/niquests#236
2 parents 82cd79d + 44c3ecf commit 60af644

File tree

9 files changed

+107
-119
lines changed

9 files changed

+107
-119
lines changed

CHANGES.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
2.12.915 (2024-04-02)
2+
=====================
3+
4+
- Fixed a performance issue when streaming download by chunk (size) not in phase with incoming packets size.
5+
See https://github.com/jawah/niquests/issues/236
6+
17
2.12.914 (2024-03-30)
28
=====================
39

docker-compose.win.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
proxy:
3-
image: traefik:v3.3.4-windowsservercore-ltsc2022
3+
image: traefik:v3.4.0-rc1-windowsservercore-ltsc2022
44
restart: unless-stopped
55
depends_on:
66
httpbin:

docker-compose.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
proxy:
3-
image: traefik:v3.3.4
3+
image: traefik:v3.4.0-rc1
44
restart: unless-stopped
55
depends_on:
66
httpbin:
@@ -86,8 +86,10 @@ services:
8686
- ./traefik:/usr/local/etc/haproxy
8787

8888
httpbin:
89-
image: mccutchen/go-httpbin:v2.16.1
89+
image: mccutchen/go-httpbin:v2.17.0
9090
restart: unless-stopped
91+
deploy:
92+
replicas: 6
9193
labels:
9294
- traefik.enable=true
9395
- traefik.http.routers.httpbin-http.rule=Host(`httpbin.local`) || Host(`alt.httpbin.local`)

src/urllib3/_async/response.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
ResponseNotReady,
2121
SSLError,
2222
)
23-
from ..response import BytesQueueBuffer, ContentDecoder, HTTPResponse
24-
from ..util.response import is_fp_closed
23+
from ..response import ContentDecoder, HTTPResponse
24+
from ..util.response import is_fp_closed, BytesQueueBuffer
2525
from ..util.retry import Retry
2626
from .connection import AsyncHTTPConnection
2727

src/urllib3/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
# This file is protected via CODEOWNERS
22
from __future__ import annotations
33

4-
__version__ = "2.12.914"
4+
__version__ = "2.12.915"

src/urllib3/backend/_async/_base.py

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from ..._collections import HTTPHeaderDict
66
from ...contrib.ssa import AsyncSocket, SSLAsyncSocket
77
from .._base import BaseBackend, ResponsePromise
8+
from ...util.response import BytesQueueBuffer
89

910

1011
class AsyncDirectStreamAccess:
@@ -176,7 +177,7 @@ def __init__(
176177

177178
self._stream_id = stream_id
178179

179-
self.__buffer_excess: bytes = b""
180+
self.__buffer_excess: BytesQueueBuffer = BytesQueueBuffer()
180181
self.__promise: ResponsePromise | None = None
181182
self._dsa = dsa
182183
self._stream_abort = stream_abort
@@ -226,35 +227,24 @@ async def read(self, __size: int | None = None) -> bytes:
226227
__size, self._stream_id
227228
)
228229

229-
# that's awkward, but rather no choice. the state machine
230-
# consume and render event regardless of your amt !
231-
if self.__buffer_excess:
232-
data = ( # Defensive: Difficult to put in place a scenario that verify this
233-
self.__buffer_excess + data
234-
)
235-
self.__buffer_excess = b"" # Defensive:
236-
else:
237-
if __size is None:
238-
data = self.__buffer_excess
239-
self.__buffer_excess = b""
240-
else:
241-
data = self.__buffer_excess[:__size]
242-
self.__buffer_excess = self.__buffer_excess[__size:]
243-
244-
if __size is not None and (0 < __size < len(data)):
245-
self.__buffer_excess = data[__size:]
246-
data = data[:__size]
247-
248-
if self._eot and len(self.__buffer_excess) == 0:
249-
self._stream_abort = None
250-
self.closed = True
230+
self.__buffer_excess.put(data)
231+
232+
buf_capacity = len(self.__buffer_excess)
233+
data = self.__buffer_excess.get(
234+
__size if __size is not None and __size > 0 else buf_capacity
235+
)
251236

252237
size_in = len(data)
253238

239+
buf_capacity -= size_in
240+
241+
if self._eot and buf_capacity == 0:
242+
self._stream_abort = None
243+
self.closed = True
244+
self._sock = None
245+
254246
if self.chunked:
255-
self.chunk_left = (
256-
len(self.__buffer_excess) if self.__buffer_excess else None
257-
)
247+
self.chunk_left = buf_capacity if buf_capacity else None
258248
elif self.length is not None:
259249
self.length -= size_in
260250

src/urllib3/backend/_base.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from .._collections import HTTPHeaderDict
1818
from .._constant import DEFAULT_BLOCKSIZE, DEFAULT_KEEPALIVE_DELAY
19+
from ..util.response import BytesQueueBuffer
1920

2021

2122
class HttpVersion(str, enum.Enum):
@@ -265,7 +266,7 @@ def __init__(
265266

266267
self._stream_id = stream_id
267268

268-
self.__buffer_excess: bytes = b""
269+
self.__buffer_excess: BytesQueueBuffer = BytesQueueBuffer()
269270
self.__promise: ResponsePromise | None = None
270271

271272
self.trailers: HTTPHeaderDict | None = None
@@ -337,36 +338,24 @@ def read(self, __size: int | None = None) -> bytes:
337338
__size, self._stream_id
338339
)
339340

340-
# that's awkward, but rather no choice. the state machine
341-
# consume and render event regardless of your amt !
342-
if self.__buffer_excess:
343-
data = ( # Defensive: Difficult to put in place a scenario that verify this
344-
self.__buffer_excess + data
345-
)
346-
self.__buffer_excess = b"" # Defensive:
347-
else:
348-
if __size is None:
349-
data = self.__buffer_excess
350-
self.__buffer_excess = b""
351-
else:
352-
data = self.__buffer_excess[:__size]
353-
self.__buffer_excess = self.__buffer_excess[__size:]
354-
355-
if __size is not None and (0 < __size < len(data)):
356-
self.__buffer_excess = data[__size:]
357-
data = data[:__size]
358-
359-
if self._eot and len(self.__buffer_excess) == 0:
341+
self.__buffer_excess.put(data)
342+
343+
buf_capacity = len(self.__buffer_excess)
344+
data = self.__buffer_excess.get(
345+
__size if __size is not None and __size > 0 else buf_capacity
346+
)
347+
348+
size_in = len(data)
349+
350+
buf_capacity -= size_in
351+
352+
if self._eot and buf_capacity == 0:
360353
self._stream_abort = None
361354
self.closed = True
362355
self._sock = None
363356

364-
size_in = len(data)
365-
366357
if self.chunked:
367-
self.chunk_left = (
368-
len(self.__buffer_excess) if self.__buffer_excess else None
369-
)
358+
self.chunk_left = buf_capacity if buf_capacity else None
370359
elif self.length is not None:
371360
self.length -= size_in
372361

src/urllib3/response.py

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import collections
43
import io
54
import json as _json
65
import logging
@@ -50,7 +49,7 @@
5049
ResponseNotReady,
5150
SSLError,
5251
)
53-
from .util.response import is_fp_closed
52+
from .util.response import is_fp_closed, BytesQueueBuffer
5453
from .util.retry import Retry
5554

5655
if typing.TYPE_CHECKING:
@@ -221,65 +220,6 @@ def _get_decoder(mode: str) -> ContentDecoder:
221220
return DeflateDecoder()
222221

223222

224-
class BytesQueueBuffer:
225-
"""Memory-efficient bytes buffer
226-
227-
To return decoded data in read() and still follow the BufferedIOBase API, we need a
228-
buffer to always return the correct amount of bytes.
229-
230-
This buffer should be filled using calls to put()
231-
232-
Our maximum memory usage is determined by the sum of the size of:
233-
234-
* self.buffer, which contains the full data
235-
* the largest chunk that we will copy in get()
236-
237-
The worst case scenario is a single chunk, in which case we'll make a full copy of
238-
the data inside get().
239-
"""
240-
241-
def __init__(self) -> None:
242-
self.buffer: typing.Deque[bytes] = collections.deque()
243-
self._size: int = 0
244-
245-
def __len__(self) -> int:
246-
return self._size
247-
248-
def put(self, data: bytes) -> None:
249-
self.buffer.append(data)
250-
self._size += len(data)
251-
252-
def get(self, n: int) -> bytes:
253-
if n == 0:
254-
return b""
255-
elif not self.buffer:
256-
raise RuntimeError("buffer is empty")
257-
elif n < 0:
258-
raise ValueError("n should be > 0")
259-
260-
fetched = 0
261-
ret = io.BytesIO()
262-
while fetched < n:
263-
remaining = n - fetched
264-
chunk = self.buffer.popleft()
265-
chunk_length = len(chunk)
266-
if remaining < chunk_length:
267-
left_chunk, right_chunk = chunk[:remaining], chunk[remaining:]
268-
ret.write(left_chunk)
269-
self.buffer.appendleft(right_chunk)
270-
self._size -= remaining
271-
break
272-
else:
273-
ret.write(chunk)
274-
self._size -= chunk_length
275-
fetched += chunk_length
276-
277-
if not self.buffer:
278-
break
279-
280-
return ret.getvalue()
281-
282-
283223
class HTTPResponse(io.IOBase):
284224
"""
285225
HTTP Response container.

src/urllib3/util/response.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
import collections
4+
import io
35
import re
46
import typing
57

@@ -44,3 +46,62 @@ def parse_alt_svc(value: str) -> typing.Iterable[tuple[str, str]]:
4446
)
4547

4648
yield from re.findall(pattern, value)
49+
50+
51+
class BytesQueueBuffer:
52+
"""Memory-efficient bytes buffer
53+
54+
To return decoded data in read() and still follow the BufferedIOBase API, we need a
55+
buffer to always return the correct amount of bytes.
56+
57+
This buffer should be filled using calls to put()
58+
59+
Our maximum memory usage is determined by the sum of the size of:
60+
61+
* self.buffer, which contains the full data
62+
* the largest chunk that we will copy in get()
63+
64+
The worst case scenario is a single chunk, in which case we'll make a full copy of
65+
the data inside get().
66+
"""
67+
68+
def __init__(self) -> None:
69+
self.buffer: typing.Deque[bytes] = collections.deque()
70+
self._size: int = 0
71+
72+
def __len__(self) -> int:
73+
return self._size
74+
75+
def put(self, data: bytes) -> None:
76+
self.buffer.append(data)
77+
self._size += len(data)
78+
79+
def get(self, n: int) -> bytes:
80+
if n == 0:
81+
return b""
82+
elif not self.buffer:
83+
raise RuntimeError("buffer is empty")
84+
elif n < 0:
85+
raise ValueError("n should be > 0")
86+
87+
fetched = 0
88+
ret = io.BytesIO()
89+
while fetched < n:
90+
remaining = n - fetched
91+
chunk = self.buffer.popleft()
92+
chunk_length = len(chunk)
93+
if remaining < chunk_length:
94+
left_chunk, right_chunk = chunk[:remaining], chunk[remaining:]
95+
ret.write(left_chunk)
96+
self.buffer.appendleft(right_chunk)
97+
self._size -= remaining
98+
break
99+
else:
100+
ret.write(chunk)
101+
self._size -= chunk_length
102+
fetched += chunk_length
103+
104+
if not self.buffer:
105+
break
106+
107+
return ret.getvalue()

0 commit comments

Comments
 (0)