From 16cd2e8b9afe93ee553ee4b17b55e50e01008daf Mon Sep 17 00:00:00 2001 From: Romy <35330373+romayalon@users.noreply.github.com> Date: Thu, 19 Sep 2024 15:34:31 +0300 Subject: [PATCH] NC | Threaded Multiple Object Delete fix Signed-off-by: Romy <35330373+romayalon@users.noreply.github.com> --- src/sdk/namespace_fs.js | 6 ++- .../nsfs_s3_tests_pending_list.txt | 1 - .../test_versioning_concurrency.test.js | 39 +++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 8222680b33..1db5f3d999 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -2766,9 +2766,11 @@ class NamespaceFS { } return version_info; } catch (err) { - retries -= 1; - if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err; dbg.warn(`NamespaceFS._delete_single_object_versioned: retrying retries=${retries} file_path=${file_path}`, err); + retries -= 1; + // retry also on ENOENT if the version moved from .versions -> latest or the opposite/deleted concurrently + // on the next retry will return + if (retries <= 0 || (!native_fs_utils.should_retry_link_unlink(is_gpfs, err) && err.code !== 'ENOENT')) throw err; } finally { if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true); } diff --git a/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt b/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt index ab191c9776..bef4e7064b 100644 --- a/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt +++ b/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt @@ -42,7 +42,6 @@ s3tests_boto3/functional/test_s3select.py::test_alias_cyclic_refernce s3tests_boto3/functional/test_s3select.py::test_schema_definition s3tests_boto3/functional/test_s3select.py::test_progress_expressions s3tests_boto3/functional/test_s3.py::test_object_write_with_chunked_transfer_encoding -s3tests_boto3/functional/test_s3.py::test_versioning_concurrent_multi_object_delete s3tests_boto3/functional/test_s3.py::test_get_object_torrent s3tests_boto3/functional/test_s3select.py::test_count_json_operation s3tests_boto3/functional/test_s3select.py::test_column_sum_min_max diff --git a/src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js b/src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js index 022ab00ea9..dcb02b69e3 100644 --- a/src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js +++ b/src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js @@ -7,6 +7,7 @@ const fs_utils = require('../../../util/fs_utils'); const NamespaceFS = require('../../../sdk/namespace_fs'); const buffer_utils = require('../../../util/buffer_utils'); const { TMP_PATH } = require('../../system_tests/test_utils'); +const { crypto_random_string } = require('../../../util/string_utils'); const endpoint_stats_collector = require('../../../sdk/endpoint_stats_collector'); function make_dummy_object_sdk(nsfs_config, uid, gid) { @@ -81,4 +82,42 @@ describe('test versioning concurrency', () => { await P.delay(1000); expect(number_of_successful_operations.length).toBe(15); }); + + it('concurrent multi object delete', async () => { + const bucket = 'bucket1'; + const concurrency_num = 10; + const delete_objects_arr = []; + for (let i = 0; i < concurrency_num; i++) { + const key = `key${i}`; + const random_data = Buffer.from(String(crypto_random_string(7))); + const body = buffer_utils.buffer_to_read_stream(random_data); + const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK); + delete_objects_arr.push({ key: key, version_id: res.version_id }); + } + const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK); + + for (const { key, version_id } of delete_objects_arr) { + const found = versions.objects.find(object => object.key === key && object.version_id === version_id); + expect(found).toBeDefined(); + } + + const delete_responses = []; + const delete_errors = []; + + for (let i = 0; i < concurrency_num; i++) { + nsfs.delete_multiple_objects({ bucket, objects: delete_objects_arr }, DUMMY_OBJECT_SDK) + .then(res => delete_responses.push(res)) + .catch(err => delete_errors.push(err)); + } + await P.delay(5000); + expect(delete_responses.length).toBe(concurrency_num); + for (const res of delete_responses) { + expect(res.length).toBe(concurrency_num); + for (const single_delete_res of res) { + expect(single_delete_res.err_message).toBe(undefined); + } + } + const list_res = await nsfs.list_objects({ bucket: bucket }, DUMMY_OBJECT_SDK); + expect(list_res.objects.length).toBe(0); + }, 8000); });