Skip to content

Commit 15e99b4

Browse files
authored
Merge pull request #292 from ynput/enhancement/291-yn-0247-server-timeouts-on-streamed-uploads
Transfer: Add retry options to upload and download
2 parents 42bd847 + eff9350 commit 15e99b4

File tree

2 files changed

+76
-18
lines changed

2 files changed

+76
-18
lines changed

ayon_api/server_api.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1329,7 +1329,7 @@ def delete(self, entrypoint: str, **kwargs):
13291329
def _endpoint_to_url(
13301330
self,
13311331
endpoint: str,
1332-
use_rest: Optional[bool] = True
1332+
use_rest: bool = True,
13331333
) -> str:
13341334
"""Cleanup endpoint and return full url to AYON server.
13351335
@@ -1347,25 +1347,51 @@ def _endpoint_to_url(
13471347
endpoint = endpoint.lstrip("/").rstrip("/")
13481348
if endpoint.startswith(self._base_url):
13491349
return endpoint
1350-
base_url = self._rest_url if use_rest else self._graphql_url
1350+
base_url = self._rest_url if use_rest else self._base_url
13511351
return f"{base_url}/{endpoint}"
13521352

13531353
def _download_file_to_stream(
1354-
self, url: str, stream, chunk_size, progress
1354+
self,
1355+
url: str,
1356+
stream: StreamType,
1357+
chunk_size: int,
1358+
progress: TransferProgress,
13551359
):
1356-
kwargs = {"stream": True}
1360+
headers = self.get_headers()
1361+
kwargs = {
1362+
"stream": True,
1363+
"headers": headers,
1364+
}
13571365
if self._session is None:
1358-
kwargs["headers"] = self.get_headers()
13591366
get_func = self._base_functions_mapping[RequestTypes.get]
13601367
else:
13611368
get_func = self._session_functions_mapping[RequestTypes.get]
13621369

1363-
with get_func(url, **kwargs) as response:
1364-
response.raise_for_status()
1365-
progress.set_content_size(response.headers["Content-length"])
1366-
for chunk in response.iter_content(chunk_size=chunk_size):
1367-
stream.write(chunk)
1368-
progress.add_transferred_chunk(len(chunk))
1370+
retries = self.get_default_max_retries()
1371+
for attempt in range(retries):
1372+
# Continue in download
1373+
offset = progress.get_transferred_size()
1374+
if offset > 0:
1375+
headers["Range"] = f"bytes={offset}-"
1376+
1377+
try:
1378+
with get_func(url, **kwargs) as response:
1379+
response.raise_for_status()
1380+
progress.set_content_size(
1381+
response.headers["Content-length"]
1382+
)
1383+
for chunk in response.iter_content(chunk_size=chunk_size):
1384+
stream.write(chunk)
1385+
progress.add_transferred_chunk(len(chunk))
1386+
break
1387+
1388+
except (
1389+
requests.exceptions.Timeout,
1390+
requests.exceptions.ConnectionError,
1391+
):
1392+
if attempt == retries:
1393+
raise
1394+
progress.next_attempt()
13691395

13701396
def download_file_to_stream(
13711397
self,
@@ -1399,7 +1425,7 @@ def download_file_to_stream(
13991425
if not chunk_size:
14001426
chunk_size = self.default_download_chunk_size
14011427

1402-
url = self._endpoint_to_url(endpoint)
1428+
url = self._endpoint_to_url(endpoint, use_rest=False)
14031429

14041430
if progress is None:
14051431
progress = TransferProgress()
@@ -1543,11 +1569,27 @@ def _upload_file(
15431569
if not chunk_size:
15441570
chunk_size = self.default_upload_chunk_size
15451571

1546-
response = post_func(
1547-
url,
1548-
data=self._upload_chunks_iter(stream, progress, chunk_size),
1549-
**kwargs
1550-
)
1572+
retries = self.get_default_max_retries()
1573+
response = None
1574+
for attempt in range(retries):
1575+
try:
1576+
response = post_func(
1577+
url,
1578+
data=self._upload_chunks_iter(
1579+
stream, progress, chunk_size
1580+
),
1581+
**kwargs
1582+
)
1583+
break
1584+
1585+
except (
1586+
requests.exceptions.Timeout,
1587+
requests.exceptions.ConnectionError,
1588+
):
1589+
if attempt == retries:
1590+
raise
1591+
progress.next_attempt()
1592+
progress.reset_transferred()
15511593

15521594
response.raise_for_status()
15531595
return response
@@ -1580,7 +1622,7 @@ def upload_file_from_stream(
15801622
requests.Response: Response object
15811623
15821624
"""
1583-
url = self._endpoint_to_url(endpoint)
1625+
url = self._endpoint_to_url(endpoint, use_rest=False)
15841626

15851627
# Create dummy object so the function does not have to check
15861628
# 'progress' variable everywhere

ayon_api/utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,7 @@ class TransferProgress:
796796
"""Object to store progress of download/upload from/to server."""
797797

798798
def __init__(self):
799+
self._attempt: int = 0
799800
self._started: bool = False
800801
self._transfer_done: bool = False
801802
self._transferred: int = 0
@@ -850,6 +851,17 @@ def set_started(self):
850851
if self._started:
851852
raise ValueError("Progress already started")
852853
self._started = True
854+
self._attempt = 1
855+
856+
def get_attempt(self) -> int:
857+
"""Find out which attempt of progress it is."""
858+
return self._attempt
859+
860+
def next_attempt(self) -> None:
861+
"""Start new attempt of progress."""
862+
if not self._started:
863+
raise ValueError("Progress did not start yet")
864+
self._attempt += 1
853865

854866
def get_transfer_done(self) -> bool:
855867
"""Transfer finished.
@@ -921,6 +933,10 @@ def set_transferred_size(self, transferred: int):
921933
"""
922934
self._transferred = transferred
923935

936+
def reset_transferred(self) -> None:
937+
"""Reset transferred size to initial value."""
938+
self._transferred = 0
939+
924940
def add_transferred_chunk(self, chunk_size: int):
925941
"""Add transferred chunk size in bytes.
926942

0 commit comments

Comments
 (0)