Skip to content

Commit

Permalink
Add VSICopyFileRestartable() to allow restart of upload of large files
Browse files Browse the repository at this point in the history
Copy a source file into a target file in a way that can (potentially)
be restarted.

This function provides the possibility of efficiently restarting upload of
large files to cloud storage that implements upload in a chunked way,
such as /vsis3/ and /vsigs/.
For other destination file systems, this function may fallback to
VSICopyFile() and not provide any smart restartable implementation.

Example of a potential workflow:
```cpp
char* pszOutputPayload = NULL;
int ret = VSICopyFileRestartable(pszSource, pszTarget, NULL,
                                 &pszOutputPayload, NULL, NULL, NULL);
while( ret == 1 ) // add also a limiting counter to avoid potentiall endless looping
{
    // TODO: wait for some time

    char* pszOutputPayloadNew = NULL;
    const char* pszInputPayload = pszOutputPayload;
    ret = VSICopyFileRestartable(pszSource, pszTarget, pszInputPayload,
                                 &pszOutputPayloadNew, NULL, NULL, NULL);
    VSIFree(pszOutputPayload);
    pszOutputPayload = pszOutputPayloadNew;
}
VSIFree(pszOutputPayload);
```
  • Loading branch information
rouault committed May 3, 2024
1 parent a2d6e4d commit 2d38652
Show file tree
Hide file tree
Showing 10 changed files with 880 additions and 20 deletions.
23 changes: 23 additions & 0 deletions autotest/gcore/vsifile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,3 +1341,26 @@ def test_vsifile_use_closed_file(tmp_path):

with pytest.raises(ValueError, match="closed file"):
gdal.VSIFWriteL("0123456789", 1, 10, f)


###############################################################################
# Test gdal.CopyFileRestartable()


def test_vsifile_CopyFileRestartable(tmp_vsimem):

dstfilename = str(tmp_vsimem / "out.txt")

retcode, output_payload = gdal.CopyFileRestartable(
str(tmp_vsimem / "i_do_not_exist.txt"), dstfilename, None
)
assert retcode == -1
assert output_payload is None
assert gdal.VSIStatL(dstfilename) is None

srcfilename = str(tmp_vsimem / "in.txt")
gdal.FileFromMemBuffer(srcfilename, "foo")
retcode, output_payload = gdal.CopyFileRestartable(srcfilename, dstfilename, None)
assert retcode == 0
assert output_payload is None
assert gdal.VSIStatL(dstfilename).size == 3
25 changes: 25 additions & 0 deletions autotest/gcore/vsioss.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,31 @@ def test_vsioss_8(server):
assert stat.S_ISDIR(gdal.VSIStatL("/vsioss/vsioss_8/test/").mode)


###############################################################################
# Test gdal.CopyFileRestartable() with fallback to regular copy


def test_vsioss_CopyFileRestartable_fallback_to_regular_copy(tmp_vsimem, server):

gdal.VSICurlClearCache()

srcfilename = str(tmp_vsimem / "foo")
gdal.FileFromMemBuffer(srcfilename, "foo\n")

dstfilename = "/vsioss/test_bucket/foo"

handler = webserver.SequentialHandler()
handler.add("PUT", "/test_bucket/foo", 200, expected_body=b"foo\n")

with webserver.install_http_handler(handler):
ret_code, restart_payload = gdal.CopyFileRestartable(
srcfilename,
dstfilename,
None, # input payload
)
assert ret_code == 0


###############################################################################
# Nominal cases (require valid credentials)

Expand Down
328 changes: 328 additions & 0 deletions autotest/gcore/vsis3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4213,6 +4213,334 @@ def test_vsis3_fake_sync_multithreaded_upload_chunk_size_failure(
)


###############################################################################
# Test gdal.CopyFileRestartable() where upload is completed in a single attempt


def test_vsis3_CopyFileRestartable_no_error(
tmp_vsimem, aws_test_config, webserver_port
):

gdal.VSICurlClearCache()

srcfilename = str(tmp_vsimem / "foo")
gdal.FileFromMemBuffer(srcfilename, "foo\n")

dstfilename = "/vsis3/test_bucket/foo"

handler = webserver.SequentialHandler()
handler.add(
"POST",
"/test_bucket/foo?uploads",
200,
{"Content-type": "application:/xml"},
b"""<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<UploadId>my_id</UploadId>
</InitiateMultipartUploadResult>""",
)
handler.add(
"PUT",
"/test_bucket/foo?partNumber=1&uploadId=my_id",
200,
{"ETag": '"first_etag"'},
expected_headers={"Content-Length": "4"},
expected_body=b"foo\n",
)
handler.add("POST", "/test_bucket/foo?uploadId=my_id", 200)

with webserver.install_http_handler(handler):
ret_code, restart_payload = gdal.CopyFileRestartable(
srcfilename, dstfilename, None
)
assert ret_code == 0
assert restart_payload is None


###############################################################################
# Test gdal.CopyFileRestartable() with one restart to complete the upload


@pytest.mark.parametrize("failure_reason", ["progress_cbk", "failed_part_put"])
def test_vsis3_CopyFileRestartable_with_restart(
tmp_vsimem, aws_test_config, webserver_port, failure_reason
):

gdal.VSICurlClearCache()

srcfilename = str(tmp_vsimem / "foo")
gdal.FileFromMemBuffer(srcfilename, "foo\n")

dstfilename = "/vsis3/test_bucket/foo"

def progress_cbk(pct, msg, user_data):
if failure_reason == "progress_cbk":
return pct < 0.5
else:
return True

handler = webserver.SequentialHandler()
handler.add(
"POST",
"/test_bucket/foo?uploads",
200,
{"Content-type": "application:/xml"},
b"""<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<UploadId>my_id</UploadId>
</InitiateMultipartUploadResult>""",
)
handler.add(
"PUT",
"/test_bucket/foo?partNumber=1&uploadId=my_id",
200,
{"ETag": '"first_etag"'},
expected_headers={"Content-Length": "3"},
expected_body=b"foo",
)
if failure_reason == "failed_part_put":
handler.add(
"PUT",
"/test_bucket/foo?partNumber=2&uploadId=my_id",
400,
expected_headers={"Content-Length": "1"},
expected_body=b"\n",
)
with webserver.install_http_handler(handler), gdal.quiet_errors():
ret_code, restart_payload = gdal.CopyFileRestartable(
srcfilename,
dstfilename,
None, # input payload
["CHUNK_SIZE=3"],
progress_cbk,
)
assert ret_code == 1
j = json.loads(restart_payload)
assert "source_mtime" in j
del j["source_mtime"]
assert j == {
"type": "CopyFileRestartablePayload",
"source": srcfilename,
"target": dstfilename,
"source_size": 4,
"chunk_size": 3,
"upload_id": "my_id",
"chunk_etags": ['"first_etag"', None],
}

handler = webserver.SequentialHandler()
handler.add(
"PUT",
"/test_bucket/foo?partNumber=2&uploadId=my_id",
200,
{"ETag": '"second_etag"'},
expected_headers={"Content-Length": "1"},
expected_body=b"\n",
)
handler.add(
"POST",
"/test_bucket/foo?uploadId=my_id",
200,
expected_body=b"""<CompleteMultipartUpload>
<Part>
<PartNumber>1</PartNumber><ETag>"first_etag"</ETag></Part>
<Part>
<PartNumber>2</PartNumber><ETag>"second_etag"</ETag></Part>
</CompleteMultipartUpload>
""",
)
with webserver.install_http_handler(handler):
ret_code, restart_payload = gdal.CopyFileRestartable(
srcfilename, dstfilename, restart_payload
)
assert ret_code == 0
assert restart_payload is None


###############################################################################
# Test gdal.CopyFileRestartable() with error cases


def test_vsis3_CopyFileRestartable_src_file_does_not_exist(
tmp_vsimem, aws_test_config, webserver_port
):

gdal.VSICurlClearCache()

with gdal.quiet_errors():
ret_code, restart_payload = gdal.CopyFileRestartable(
"/vsimem/i/do/not/exist", "/vsis3/test_bucket/dst", None
)
assert ret_code == -1
assert restart_payload is None


###############################################################################
# Test gdal.CopyFileRestartable() with error cases


def test_vsis3_CopyFileRestartable_InitiateMultipartUpload_failed(
tmp_vsimem, aws_test_config, webserver_port
):

gdal.VSICurlClearCache()

srcfilename = str(tmp_vsimem / "foo")
gdal.FileFromMemBuffer(srcfilename, "foo\n")

dstfilename = "/vsis3/test_bucket/foo"

handler = webserver.SequentialHandler()
handler.add("POST", "/test_bucket/foo?uploads", 400)
with webserver.install_http_handler(handler), gdal.quiet_errors():
ret_code, restart_payload = gdal.CopyFileRestartable(
srcfilename,
dstfilename,
None, # input payload
)
assert ret_code == -1


###############################################################################
# Test gdal.CopyFileRestartable() with error cases


def test_vsis3_CopyFileRestartable_CompleteMultipartUpload_failed(
tmp_vsimem, aws_test_config, webserver_port
):

gdal.VSICurlClearCache()

srcfilename = str(tmp_vsimem / "foo")
gdal.FileFromMemBuffer(srcfilename, "foo\n")

dstfilename = "/vsis3/test_bucket/foo"

handler = webserver.SequentialHandler()
handler.add(
"POST",
"/test_bucket/foo?uploads",
200,
{"Content-type": "application:/xml"},
b"""<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<UploadId>my_id</UploadId>
</InitiateMultipartUploadResult>""",
)
handler.add(
"PUT",
"/test_bucket/foo?partNumber=1&uploadId=my_id",
200,
{"ETag": '"first_etag"'},
expected_headers={"Content-Length": "4"},
expected_body=b"foo\n",
)
handler.add("POST", "/test_bucket/foo?uploadId=my_id", 400)
handler.add("DELETE", "/test_bucket/foo?uploadId=my_id", 200)

with webserver.install_http_handler(handler), gdal.quiet_errors():
ret_code, restart_payload = gdal.CopyFileRestartable(
srcfilename,
dstfilename,
None, # input payload
)
assert ret_code == -1


###############################################################################
# Test gdal.CopyFileRestartable() with errors in input payload


@pytest.mark.parametrize(
"key,value,error_msg",
[
("source", None, "'source' field in input payload does not match pszSource"),
("target", None, "'target' field in input payload does not match pszTarget"),
("chunk_size", None, "'chunk_size' field in input payload missing or invalid"),
(
"source_size",
None,
"'source_size' field in input payload does not match source file size",
),
(
"source_mtime",
None,
"'source_mtime' field in input payload does not match source file modification time",
),
("upload_id", None, "'upload_id' field in input payload missing or invalid"),
(
"chunk_etags",
None,
"'chunk_etags' field in input payload missing or invalid",
),
(
"chunk_etags",
[],
"'chunk_etags' field in input payload has not expected size",
),
],
)
def test_vsis3_CopyFileRestartable_errors_input_payload(
tmp_vsimem, key, value, error_msg
):

srcfilename = str(tmp_vsimem / "foo")
gdal.FileFromMemBuffer(srcfilename, "foo\n")

dstfilename = "/vsis3/test_bucket/foo"

j = {
"type": "CopyFileRestartablePayload",
"source": srcfilename,
"target": dstfilename,
"source_size": 4,
"chunk_size": 3,
"upload_id": "my_id",
"chunk_etags": ['"first_etag"', None],
}
j["source_mtime"] = gdal.VSIStatL(srcfilename).mtime

j[key] = value
restart_payload = json.dumps(j)

gdal.ErrorReset()
with gdal.quiet_errors():
ret_code, restart_payload = gdal.CopyFileRestartable(
srcfilename, dstfilename, restart_payload
)
assert ret_code == -1
assert restart_payload is None
assert gdal.GetLastErrorMsg() == error_msg


###############################################################################
# Test gdal.CopyFileRestartable() with /vsis3/ to /vsis3/


def test_vsis3_CopyFileRestartable_server_side(
tmp_vsimem, aws_test_config, webserver_port
):

gdal.VSICurlClearCache()

srcfilename = "/vsis3/test_bucket/src"
dstfilename = "/vsis3/test_bucket/dst"

handler = webserver.SequentialHandler()
handler.add(
"PUT",
"/test_bucket/dst",
200,
expected_headers={"x-amz-copy-source": "/test_bucket/src"},
)
with webserver.install_http_handler(handler):
ret_code, restart_payload = gdal.CopyFileRestartable(
srcfilename, dstfilename, None
)
assert ret_code == 0
assert restart_payload is None


###############################################################################
# Test reading/writing metadata

Expand Down
Loading

0 comments on commit 2d38652

Please sign in to comment.