Skip to content

Commit

Permalink
NSFS | versioning | add direcory content versioning PUT action
Browse files Browse the repository at this point in the history
Signed-off-by: nadav mizrahi <[email protected]>
  • Loading branch information
nadavMiz committed Feb 4, 2025
1 parent 3fc59d6 commit 85ff011
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 19 deletions.
1 change: 1 addition & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ config.NSFS_RANDOM_DELAY_BASE = 70;

config.NSFS_VERSIONING_ENABLED = true;
config.NSFS_UPDATE_ISSUES_REPORT_ENABLED = true;
config.NSFS_CONTENT_DIRECTORY_VERSIONING_ENABLED = false;

config.NSFS_EXIT_EVENTS_TIME_FRAME_MIN = 24 * 60; // per day
config.NSFS_MAX_EXIT_EVENTS_PER_TIME_FRAME = 10; // allow max 10 failed forks per day
Expand Down
69 changes: 53 additions & 16 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ function to_fs_xattr(xattr) {
return _.mapKeys(xattr, (val, key) => XATTR_USER_PREFIX + key);
}

function filter_fs_xattr(xattr) {
return _.pickBy(xattr, (val, key) => key?.startsWith(XATTR_NOOBAA_INTERNAL_PREFIX));
}

/**
* get_random_delay returns a random delay number between base + min and max
* @param {number} base
Expand Down Expand Up @@ -1199,7 +1203,7 @@ class NamespaceFS {
try {
await this._check_path_in_bucket_boundaries(fs_context, file_path);

if (this.empty_dir_content_flow(file_path, params)) {
if (this.should_use_empty_content_dir_optimization() && this.empty_dir_content_flow(file_path, params)) {
const content_dir_info = await this._create_empty_dir_content(fs_context, params, file_path);
return content_dir_info;
}
Expand Down Expand Up @@ -1337,7 +1341,8 @@ class NamespaceFS {
const part_upload = file_path === upload_path;
const same_inode = params.copy_source && copy_res === COPY_STATUS_ENUM.SAME_INODE;
const should_replace_xattr = params.copy_source ? copy_res === COPY_STATUS_ENUM.FALLBACK : true;
const is_dir_content = this._is_directory_content(file_path, params.key);
const is_dir_content_unoptimized_flow = this._is_directory_content(file_path, params.key) &&
this.should_use_empty_content_dir_optimization();

const stat = await target_file.stat(fs_context);
this._verify_encryption(params.encryption, this._get_encryption_info(stat));
Expand Down Expand Up @@ -1380,18 +1385,21 @@ class NamespaceFS {
});
}
}
if (fs_xattr && !is_dir_content && should_replace_xattr) await target_file.replacexattr(fs_context, fs_xattr);
if (fs_xattr && !is_dir_content_unoptimized_flow && should_replace_xattr) {
await target_file.replacexattr(fs_context, fs_xattr);
}
// fsync
if (config.NSFS_TRIGGER_FSYNC) await target_file.fsync(fs_context);
dbg.log1('NamespaceFS._finish_upload:', open_mode, file_path, upload_path, fs_xattr);

if (!same_inode && !part_upload) {
await this._move_to_dest(fs_context, upload_path, file_path, target_file, open_mode, params.key, is_dir_content);
await this._move_to_dest(fs_context, upload_path, file_path, target_file, open_mode, params.key,
is_dir_content_unoptimized_flow);
}

// when object is a dir, xattr are set on the folder itself and the content is in .folder file
// we still should put the xattr if copy is link/same inode because we put the xattr on the directory
if (is_dir_content) {
if (is_dir_content_unoptimized_flow) {
await this._assign_dir_content_to_xattr(fs_context, fs_xattr, { ...params, size: stat.size }, copy_xattr);
}
stat.xattr = { ...stat.xattr, ...fs_xattr };
Expand Down Expand Up @@ -1419,16 +1427,14 @@ class NamespaceFS {
}

// move to dest GPFS (wt) / POSIX (w / undefined) - non part upload
async _move_to_dest(fs_context, source_path, dest_path, target_file, open_mode, key, is_dir_content) {
dbg.log2('_move_to_dest', fs_context, source_path, dest_path, target_file, open_mode, key, is_dir_content);
async _move_to_dest(fs_context, source_path, dest_path, target_file, open_mode, key, is_old_dir_flow) {
dbg.log2('_move_to_dest', fs_context, source_path, dest_path, target_file, open_mode, key, is_old_dir_flow);
let retries = config.NSFS_RENAME_RETRIES;
// will retry renaming a file in case of parallel deleting of the destination path
for (;;) {
try {
await native_fs_utils._make_path_dirs(dest_path, fs_context);
if (this._is_versioning_disabled() ||
(this._is_versioning_enabled() && is_dir_content)) {
// dir_content is not supported in versioning, hence we will treat it like versioning disabled
if (this._is_versioning_disabled() || is_old_dir_flow) {
if (open_mode === 'wt') {
await target_file.linkfileat(fs_context, dest_path);
} else {
Expand Down Expand Up @@ -1471,6 +1477,7 @@ class NamespaceFS {
dbg.log1('Namespace_fs._move_to_dest_version:', new_ver_tmp_path, latest_ver_path, upload_file);
let gpfs_options;
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
const is_dir = this._is_directory_content(latest_ver_path, key);
let retries = config.NSFS_RENAME_RETRIES;
for (;;) {
try {
Expand All @@ -1490,7 +1497,7 @@ class NamespaceFS {
latest_ver_info = await this._get_version_info(fs_context, latest_ver_path);
}
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
const versioned_path = latest_ver_info && this._get_version_path(key, latest_ver_info.version_id_str);
const versioned_path = latest_ver_info && this._get_version_path(key, latest_ver_info.version_id_str, is_dir);
dbg.log1('Namespace_fs._move_to_dest_version:', latest_ver_info, new_ver_info, gpfs_options);

if (this._is_versioning_suspended()) {
Expand All @@ -1513,6 +1520,9 @@ class NamespaceFS {
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
gpfs_options?.move_to_versions, bucket_tmp_dir_path);
}
if (is_dir) {
await this.handle_latest_disabled_dir_content_xattr(key, fs_context, versioned_path, latest_ver_path);
}
try {
// move new version to latest_ver_path (key path)
await native_fs_utils.safe_move(fs_context, new_ver_tmp_path, latest_ver_path, new_ver_info,
Expand All @@ -1536,6 +1546,26 @@ class NamespaceFS {
}
}

// handle xattr of content dir when moving from disabled to enabled mode.
// in the disabled version of content dir, the xattr is on the directory itself. so need to move it seperatly from the obejct
// in case of enabled mode we need to move the xattr to the new object
// both for suspended and enabled mode we need to clear the user xattr from the directory
async handle_latest_disabled_dir_content_xattr(key, fs_context, versioned_path, latest_ver_path) {
const directory_stat = await native_fs_utils.stat_ignore_enoent(fs_context, path.dirname(latest_ver_path));
const is_disabled_dir_content = directory_stat && directory_stat.xattr && directory_stat.xattr[XATTR_DIR_CONTENT];
if (is_disabled_dir_content) {
if (this._is_versioning_enabled()) {
dbg.log1('NamespaceFS._move_to_dest_version latest object is a directory object with attributes on the directory. move the xattr to the new .version file');
//this scenario happens only after moving from disabled to enabled mode or after upgrade. version-id is always null
versioned_path = versioned_path || this._get_version_path(key, NULL_VERSION_ID, true);
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
//in case of empty directory object .folder of the latest doesn't exist. use 'w' to create it if its missing
await this.set_fs_xattr_op(fs_context, versioned_path, filter_fs_xattr(directory_stat.xattr), undefined, "w");
}
await this._clear_user_xattr(fs_context, path.dirname(latest_ver_path), XATTR_USER_PREFIX);
}
}

// Comparing both device and inode number (st_dev and st_ino returned by stat)
// will tell you whether two different file names refer to the same thing.
// If so, we will return the etag and encryption info of the file_path
Expand Down Expand Up @@ -2284,10 +2314,10 @@ class NamespaceFS {
* @param {*} clear - the xattr prefix to be cleared
* @returns {Promise<void>}
*/
async set_fs_xattr_op(fs_context, file_path, set, clear) {
async set_fs_xattr_op(fs_context, file_path, set, clear, mode = config.NSFS_OPEN_READ_MODE) {
let file;
try {
file = await nb_native().fs.open(fs_context, file_path, config.NSFS_OPEN_READ_MODE,
file = await nb_native().fs.open(fs_context, file_path, mode,
native_fs_utils.get_umasked_mode(config.BASE_MODE_FILE));
await file.replacexattr(fs_context, set, clear);
await file.close(fs_context);
Expand Down Expand Up @@ -2673,6 +2703,10 @@ class NamespaceFS {
const is_dir_content = this._is_directory_content(file_path, params.key);
return is_dir_content && params.size === 0;
}

should_use_empty_content_dir_optimization() {
return this._is_versioning_disabled() || !config.NSFS_CONTENT_DIRECTORY_VERSIONING_ENABLED;
}
/**
* returns if should force md5 calculation for the bucket/account.
* first check if defined for bucket / account, if not use global default
Expand Down Expand Up @@ -2730,9 +2764,12 @@ class NamespaceFS {
}

// returns version path of the form bucket_path/dir/.versions/{key}_{version_id}
_get_version_path(key, version_id) {
const key_version = path.basename(key) + (version_id ? '_' + version_id : '');
return path.normalize(path.join(this.bucket_path, path.dirname(key), HIDDEN_VERSIONS_PATH, key_version));
// in case of directory, the path is bucket_path/dir/key/.versions/.folder_{version_id}
_get_version_path(key, version_id, is_dir = false) {
const key_name = is_dir ? config.NSFS_FOLDER_OBJECT_NAME : path.basename(key);
const dir_name = is_dir ? key : path.dirname(key);
const key_version = key_name + (version_id ? '_' + version_id : '');
return path.normalize(path.join(this.bucket_path, dir_name, HIDDEN_VERSIONS_PATH, key_version));
}

// this function returns the following version information -
Expand Down
1 change: 1 addition & 0 deletions src/test/unit_tests/coretest.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ config.test_mode = true;
config.NODES_FREE_SPACE_RESERVE = 10 * 1024 * 1024;
config.NSFS_VERSIONING_ENABLED = true;
config.OBJECT_SDK_BUCKET_CACHE_EXPIRY_MS = 1;
config.NSFS_CONTENT_DIRECTORY_VERSIONING_ENABLED = true;

config.ROOT_KEY_MOUNT = '/tmp/noobaa-server/root_keys';

Expand Down
2 changes: 2 additions & 0 deletions src/test/unit_tests/jest_tests/test_nsfs_concurrency.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const buffer_utils = require('../../../util/buffer_utils');
const { TMP_PATH } = require('../../system_tests/test_utils');
const { crypto_random_string } = require('../../../util/string_utils');
const endpoint_stats_collector = require('../../../sdk/endpoint_stats_collector');
const config = require('../../../../config');
config.NSFS_VERSIONING_ENABLED = true;

function make_dummy_object_sdk(nsfs_config, uid, gid) {
return {
Expand Down
27 changes: 27 additions & 0 deletions src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,33 @@ describe('test versioning concurrency', () => {
});
expect(num_versions).toBe(num_copies);
}, TEST_TIMEOUT);

it('content dir multiple puts of the same key - enabled', async () => {
const bucket = 'bucket-directory';
const key = 'key-s/';

//upload disabled mode empty content dir, to check the creation of new
nsfs.versioning = 'DISABLED';
await nsfs.upload_object({ bucket: bucket, key: key, size: 0 }, DUMMY_OBJECT_SDK);

nsfs.versioning = 'ENABLED';
const failed_operations = [];
const successful_operations = [];
const num_of_concurrency = 10;
for (let i = 0; i < num_of_concurrency; i++) {
const random_data = Buffer.from(String(i));
const body = buffer_utils.buffer_to_read_stream(random_data);
nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK)
.catch(err => failed_operations.push(err))
.then(res => successful_operations.push(res));
}
await P.delay(2000);
expect(successful_operations).toHaveLength(num_of_concurrency);
expect(failed_operations).toHaveLength(0);
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);
//TODO should be num_of_concurrency + 1 (the null version). list-object-version currently ignores the latest .folder file
expect(versions.objects.length).toBe(num_of_concurrency);
}, TEST_TIMEOUT);
});

/**
Expand Down
3 changes: 2 additions & 1 deletion src/test/unit_tests/nc_coretest.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ async function config_dir_setup() {
NC_RELOAD_CONFIG_INTERVAL: 1,
// DO NOT CHANGE - setting VACCUM_ANALYZER_INTERVAL=1 needed for failing the tests
// in case where vaccumAnalyzer is being called before setting process.env.NC_NSFS_NO_DB_ENV = 'true' on nsfs.js
VACCUM_ANALYZER_INTERVAL: 1
VACCUM_ANALYZER_INTERVAL: 1,
NSFS_CONTENT_DIRECTORY_VERSIONING_ENABLED: true
}));
await fs.promises.mkdir(FIRST_BUCKET_PATH, { recursive: true });
}
Expand Down
Loading

0 comments on commit 85ff011

Please sign in to comment.