Skip to content

add arg to enable blob chunking and allow custom chunk sizes #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ oras.egg-info/
env
__pycache__
.python-version
/venv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need the leading slash for venv in the local directory.

43 changes: 38 additions & 5 deletions oras/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,23 @@ def upload_blob(
container: container_type,
layer: dict,
do_chunked: bool = False,
chunk_size=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should default to the default chunk size instead of None. If it's a shared value, put in defaults.py and reference from both / several function inputs.

refresh_headers: bool = True,
) -> requests.Response:
"""
Prepare and upload a blob.

Sizes > 1024 are uploaded via a chunked approach (post, patch+, put)
and <= 1024 is a single post then put.
Large artifacts can be uploaded via a chunked approach (post, patch+, put)
to registries that support it. Larger chunks generally give better throughput.
Set do_chunked=True for chunked upload.

:param blob: path to blob to upload
:type blob: str
:param container: parsed container URI
:type container: oras.container.Container or str
:param layer: dict from oras.oci.NewLayer
:type layer: dict
:param do_chunked: if true use chunked upload. This allows upload of larger oci artifacts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the caller wants to do a chunked upload, they can use the function directly.

:param refresh_headers: if true, headers are refreshed
:type refresh_headers: bool
"""
Expand All @@ -250,8 +253,13 @@ def upload_blob(
blob, container, layer, refresh_headers=refresh_headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the comment above:

 # Chunked for large, otherwise POST and PUT
# This is currently disabled unless the user asks for it, as
# it doesn't seem to work for all registries

Chunked was not working well for different registries so I am hesitant to add it back.

)
else:
print("chunked upload ON.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to add logging, please use the oras.logger.

response = self.chunked_upload(
blob, container, layer, refresh_headers=refresh_headers
blob,
container,
layer,
refresh_headers=refresh_headers,
chunk_size=chunk_size,
)

# If we have an empty layer digest and the registry didn't accept, just return dummy successful response
Expand Down Expand Up @@ -556,6 +564,7 @@ def chunked_upload(
container: oras.container.Container,
layer: dict,
refresh_headers: bool = True,
chunk_size=None,
) -> requests.Response:
"""
Upload via a chunked upload.
Expand All @@ -568,7 +577,14 @@ def chunked_upload(
:type layer: dict
:param refresh_headers: if true, headers are refreshed
:type refresh_headers: bool
:param chunk_size: chunk size in bytes
:type chunk_size: int
"""

default_chunk_size = 16777216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here - this should be the default size, from defaults.py, in the function.

if chunk_size is None:
chunk_size = default_chunk_size

# Start an upload session
headers = {"Content-Type": "application/octet-stream", "Content-Length": "0"}
if not refresh_headers:
Expand All @@ -585,7 +601,9 @@ def chunked_upload(
# Read the blob in chunks, for each do a patch
start = 0
with open(blob, "rb") as fd:
for chunk in oras.utils.read_in_chunks(fd):
for chunk in oras.utils.read_in_chunks(fd, chunk_size=chunk_size):
print("uploading chunk starting at " + str(start))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the oras.logger here.


if not chunk:
break

Expand Down Expand Up @@ -689,6 +707,10 @@ def push(self, *args, **kwargs) -> requests.Response:
:type target: str
:param refresh_headers: if true or None, headers are refreshed
:type refresh_headers: bool
:param do_chunked: if true do chunked blob upload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would better be exposed through a separate push_chunked or similar function, if it's a good idea to add it back.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. It would require duplicating ~ 150 lines of code which would be otherwise unchanged. Is that really what you want?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, if you like the separate function approach better than using an argument, I can rename existing 'push' to '_push' and create new, small 'push' and 'push_chunked'. Let me know how I should proceed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial design I had was fairly simple:

oras-py/oras/provider.py

Lines 147 to 170 in 07ae3a1

def _upload_blob(
self, blob: str, container: Union[str, oras.container.Container], layer: dict
) -> requests.Response:
"""
Prepare and upload a blob.
Sizes > 1024 are uploaded via a chunked approach (post, patch+, put)
and <= 1024 is a single post then put.
:param blob: path to blob to upload
:type blob: str
:param container: parsed container URI
:type container: oras.container.Container or str
:param layer: dict from oras.oci.NewLayer
:type layer: dict
"""
blob = os.path.abspath(blob)
container = self.get_container(container)
size = layer["size"]
# Chunked for large, otherwise POST and PUT
if size < 1024:
return self._put_upload(blob, container, layer)
return self._chunked_upload(blob, container, layer)
and based on size. I think we'd want to try a similar approach, with a boolean that defaults to False. For my comment above, I do think we want a clean separation of the two functionalities (akin to how it was done in the link above).

:type do_chunked: bool
:param chunk_size: chunk size in bytes
:type chunk_size: int
:param subject: optional subject reference
:type subject: oras.oci.Subject
"""
Expand All @@ -709,6 +731,12 @@ def push(self, *args, **kwargs) -> requests.Response:
if refresh_headers is None:
refresh_headers = True

do_chunked = kwargs.get("do_chunked")
if do_chunked is None:
do_chunked = False

chunk_size = kwargs.get("chunk_size")

# Upload files as blobs
for blob in kwargs.get("files", []):
# You can provide a blob + content type
Expand Down Expand Up @@ -755,7 +783,12 @@ def push(self, *args, **kwargs) -> requests.Response:

# Upload the blob layer
response = self.upload_blob(
blob, container, layer, refresh_headers=refresh_headers
blob,
container,
layer,
refresh_headers=refresh_headers,
do_chunked=do_chunked,
chunk_size=chunk_size,
)
self._check_200_response(response)

Expand Down