Skip to content

Commit

Permalink
NC | NSFS | Add Concurrency Tests
Browse files Browse the repository at this point in the history
Signed-off-by: shirady <[email protected]>
  • Loading branch information
shirady committed Sep 23, 2024
1 parent a3b5ab8 commit 9cc8b90
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2939,7 +2939,7 @@ class NamespaceFS {
break;
} catch (err) {
retries -= 1;
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
if (retries <= 0 || (!native_fs_utils.should_retry_link_unlink(is_gpfs, err) && err.code !== 'ENOENT')) throw err;
dbg.warn(`NamespaceFS._delete_latest_version: Retrying retries=${retries} latest_ver_path=${latest_ver_path}`, err);
} finally {
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
Expand Down
98 changes: 79 additions & 19 deletions src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ function make_dummy_object_sdk(nsfs_config, uid, gid) {
};
}

const tmp_fs_path = path.join(TMP_PATH, 'test_versioning_concurrency');

const nsfs = new NamespaceFS({
bucket_path: tmp_fs_path,
bucket_id: '1',
namespace_resource_id: undefined,
access_mode: undefined,
versioning: 'ENABLED',
force_md5_etag: false,
stats: endpoint_stats_collector.instance(),
});

const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
describe('test versioning concurrency', () => {
const tmp_fs_path = path.join(TMP_PATH, 'test_versioning_concurrency');

const nsfs = new NamespaceFS({
bucket_path: tmp_fs_path,
bucket_id: '1',
namespace_resource_id: undefined,
access_mode: undefined,
versioning: 'ENABLED',
force_md5_etag: false,
stats: endpoint_stats_collector.instance(),
});

beforeEach(async () => {
await fs_utils.create_fresh_path(tmp_fs_path);
Expand All @@ -63,14 +64,9 @@ describe('test versioning concurrency', () => {
it('multiple delete version id and key', async () => {
const bucket = 'bucket1';
const key = 'key2';
const versions_arr = [];
// upload 5 versions of key2
for (let i = 0; i < 5; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK).catch(err => console.log('put error - ', err));
versions_arr.push(res.etag);
}
const number_of_versions = 5;
const versions_arr = await _upload_versions(bucket, key, number_of_versions);

const mid_version_id = versions_arr[3];
const number_of_successful_operations = [];
for (let i = 0; i < 15; i++) {
Expand All @@ -81,4 +77,68 @@ describe('test versioning concurrency', () => {
await P.delay(1000);
expect(number_of_successful_operations.length).toBe(15);
});

it('concurrent delete of latest version', async () => {
const bucket = 'bucket1';
const key = 'key3';
const number_of_versions = 5;
const versions_arr = await _upload_versions(bucket, key, number_of_versions);
expect(versions_arr.length).toBe(number_of_versions);

const successful_operations = [];
for (let i = 0; i < 3; i++) {
nsfs.delete_object({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK)
.then(res => successful_operations.push(res))
.catch(err => console.log('delete latest version error - ', err));
}

await P.delay(1000);
expect(successful_operations.length).toBe(3);
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(versions.objects.length).toBe(8); // 5 versions before + 3 delete markers concurrent
const delete_marker_arr = versions.objects.filter(object => object.delete_marker === true);
expect(delete_marker_arr.length).toBe(3);
});

it('concurrent put object and head object latest version', async () => {
const bucket = 'bucket1';
const key = 'key4';
await _upload_versions(bucket, key, 1);

const successful_operations = [];
const number_of_iterations = 5; // by changing it to 10 it sometimes fails
for (let i = 0; i < number_of_iterations; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
.then(res => successful_operations.push(res))
.catch(err => console.log('multiple puts of the same key error - ', err));
nsfs.read_object_md({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK)
.then(res => successful_operations.push(res))
.catch(err => console.log('multiple heads of the same key error - ', err));
}
await P.delay(1000);
const expected_number_of_successful_operations = number_of_iterations * 2;
expect(successful_operations.length).toBe(expected_number_of_successful_operations);
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(versions.objects.length).toBe(number_of_iterations + 1); // 1 version before + 10 versions concurrent
});
});

/**
* _upload_versions uploads number_of_versions of key in bucket with a body of random data
* note: this function is not concurrent, it's a helper function for preparing a bucket with a couple of versions
* @param {string} bucket
* @param {string} key
* @param {number} number_of_versions
*/
async function _upload_versions(bucket, key, number_of_versions) {
const versions_arr = [];
for (let i = 0; i < number_of_versions; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK).catch(err => console.log('put error - ', err));
versions_arr.push(res.etag);
}
return versions_arr;
}

0 comments on commit 9cc8b90

Please sign in to comment.