Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NC | NSFS | Add Concurrency Tests #8394

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 97 additions & 21 deletions src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
'use strict';

const path = require('path');
const config = require('../../../../config');
const P = require('../../../util/promise');
const fs_utils = require('../../../util/fs_utils');
const NamespaceFS = require('../../../sdk/namespace_fs');
Expand All @@ -26,60 +27,63 @@ function make_dummy_object_sdk(nsfs_config, uid, gid) {
};
}

const tmp_fs_path = path.join(TMP_PATH, 'test_versioning_concurrency');

const nsfs = new NamespaceFS({
bucket_path: tmp_fs_path,
bucket_id: '1',
namespace_resource_id: undefined,
access_mode: undefined,
versioning: 'ENABLED',
force_md5_etag: false,
stats: endpoint_stats_collector.instance(),
});

const DUMMY_OBJECT_SDK = make_dummy_object_sdk(true);
describe('test versioning concurrency', () => {
const tmp_fs_path = path.join(TMP_PATH, 'test_versioning_concurrency');

const nsfs = new NamespaceFS({
bucket_path: tmp_fs_path,
bucket_id: '1',
namespace_resource_id: undefined,
access_mode: undefined,
versioning: 'ENABLED',
force_md5_etag: false,
stats: endpoint_stats_collector.instance(),
});
const prior_value_of_nsfs_rename_retries = config.NSFS_RENAME_RETRIES;

beforeEach(async () => {
await fs_utils.create_fresh_path(tmp_fs_path);
});

afterEach(async () => {
await fs_utils.folder_delete(tmp_fs_path);
config.NSFS_RENAME_RETRIES = prior_value_of_nsfs_rename_retries;
});

it('multiple puts of the same key', async () => {
const bucket = 'bucket1';
const key = 'key1';
const failed_operations = [];
for (let i = 0; i < 5; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK).catch(err => console.log('multiple puts of the same key error - ', err));
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
.catch(err => failed_operations.push(err));
}
await P.delay(1000);
expect(failed_operations.length).toBe(0);
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(versions.objects.length).toBe(5);
});

it('multiple delete version id and key', async () => {
const bucket = 'bucket1';
const key = 'key2';
const versions_arr = [];
// upload 5 versions of key2
for (let i = 0; i < 5; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK).catch(err => console.log('put error - ', err));
versions_arr.push(res.etag);
}
const number_of_versions = 5;
const versions_arr = await _upload_versions(bucket, key, number_of_versions);

const mid_version_id = versions_arr[3];
const number_of_successful_operations = [];
const failed_operations = [];
for (let i = 0; i < 15; i++) {
nsfs.delete_object({ bucket: bucket, key: key, version_id: mid_version_id }, DUMMY_OBJECT_SDK)
.then(res => number_of_successful_operations.push(res))
.catch(err => console.log('delete the same key & version id error - ', err));
.catch(err => failed_operations.push(err));
}
await P.delay(1000);
expect(failed_operations.length).toBe(0);
expect(number_of_successful_operations.length).toBe(15);
});

Expand Down Expand Up @@ -124,4 +128,76 @@ describe('test versioning concurrency', () => {
const list_res = await nsfs.list_objects({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(list_res.objects.length).toBe(0);
}, 8000);

it('concurrent delete of latest version', async () => {
const bucket = 'bucket1';
const key = 'key3';
const number_of_versions = 5;
const versions_arr = await _upload_versions(bucket, key, number_of_versions);
expect(versions_arr.length).toBe(number_of_versions);

const successful_operations = [];
const failed_operations = [];
for (let i = 0; i < 3; i++) {
shirady marked this conversation as resolved.
Show resolved Hide resolved
nsfs.delete_object({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK)
.then(res => successful_operations.push(res))
.catch(err => failed_operations.push(err));
}

await P.delay(1000);
expect(failed_operations.length).toBe(0);
expect(successful_operations.length).toBe(3);
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(versions.objects.length).toBe(8); // 5 versions before + 3 delete markers concurrent
shirady marked this conversation as resolved.
Show resolved Hide resolved
const delete_marker_arr = versions.objects.filter(object => object.delete_marker === true);
expect(delete_marker_arr.length).toBe(3);
});

it('concurrent put object and head object latest version', async () => {
const bucket = 'bucket1';
const key = 'key4';
await _upload_versions(bucket, key, 1);

const successful_put_operations = [];
const successful_head_operations = [];
const failed_put_operations = [];
const failed_head_operations = [];
const number_of_iterations = 10;
config.NSFS_RENAME_RETRIES = 40;
for (let i = 0; i < number_of_iterations; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
.then(res => successful_put_operations.push(res))
.catch(err => failed_put_operations.push(err));
nsfs.read_object_md({ bucket: bucket, key: key }, DUMMY_OBJECT_SDK)
.then(res => successful_head_operations.push(res))
.catch(err => failed_head_operations.push(err));
}
await P.delay(1000);
expect(failed_put_operations.length).toBe(0);
expect(failed_head_operations.length).toBe(0);
expect(successful_head_operations.length).toBe(number_of_iterations);
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(versions.objects.length).toBe(number_of_iterations + 1); // 1 version before + 10 versions concurrent
});
});

/**
* _upload_versions uploads number_of_versions of key in bucket with a body of random data
* note: this function is not concurrent, it's a helper function for preparing a bucket with a couple of versions
* @param {string} bucket
* @param {string} key
* @param {number} number_of_versions
*/
async function _upload_versions(bucket, key, number_of_versions) {
const versions_arr = [];
for (let i = 0; i < number_of_versions; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
.catch(err => console.log('put error - ', err));
versions_arr.push(res.etag);
}
return versions_arr;
}