Skip to content

Commit 9feeac5

Browse files
committed
NSFS | Replace ChunkFS with FileWriter
Signed-off-by: Romy <[email protected]> (cherry picked from commit a451243)
1 parent 2ed9b88 commit 9feeac5

File tree

9 files changed

+252
-188
lines changed

9 files changed

+252
-188
lines changed

config.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,8 @@ config.NSFS_LOW_FREE_SPACE_MB_UNLEASH = 10 * 1024;
821821
// operations safely.
822822
config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10;
823823

824+
config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;
825+
824826
////////////////////////////
825827
// NSFS NON CONTAINERIZED //
826828
////////////////////////////

src/sdk/namespace_fs.js

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const stream_utils = require('../util/stream_utils');
1818
const buffer_utils = require('../util/buffer_utils');
1919
const size_utils = require('../util/size_utils');
2020
const native_fs_utils = require('../util/native_fs_utils');
21-
const ChunkFS = require('../util/chunk_fs');
21+
const FileWriter = require('../util/file_writer');
2222
const LRUCache = require('../util/lru_cache');
2323
const nb_native = require('../util/nb_native');
2424
const RpcError = require('../rpc/rpc_error');
@@ -1483,31 +1483,34 @@ class NamespaceFS {
14831483
// Can be finetuned further on if needed and inserting the Semaphore logic inside
14841484
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
14851485
async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) {
1486-
const { source_stream, copy_source } = params;
1486+
const { copy_source } = params;
14871487
try {
14881488
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
14891489
const md5_enabled = config.NSFS_CALCULATE_MD5 || (this.force_md5_etag ||
14901490
object_sdk?.requesting_account?.force_md5_etag);
1491-
const chunk_fs = new ChunkFS({
1491+
const file_writer = new FileWriter({
14921492
target_file,
14931493
fs_context,
1494-
stats: this.stats,
1495-
namespace_resource_id: this.namespace_resource_id,
1496-
md5_enabled,
14971494
offset,
1495+
md5_enabled,
1496+
stats: this.stats,
14981497
bucket: params.bucket,
1499-
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
1498+
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size,
1499+
namespace_resource_id: this.namespace_resource_id,
15001500
});
1501-
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
1501+
file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err));
1502+
file_writer.on('finish', arg => dbg.log1('namespace_fs._upload_stream: finish occured on stream FileWriter: ', arg));
1503+
file_writer.on('close', arg => dbg.log1('namespace_fs._upload_stream: close occured on stream FileWriter: ', arg));
1504+
15021505
if (copy_source) {
1503-
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
1506+
await this.read_object_stream(copy_source, object_sdk, file_writer);
15041507
} else if (params.source_params) {
1505-
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
1508+
await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer);
15061509
} else {
1507-
await stream_utils.pipeline([source_stream, chunk_fs]);
1508-
await stream_utils.wait_finished(chunk_fs);
1510+
await stream_utils.pipeline([params.source_stream, file_writer]);
1511+
await stream_utils.wait_finished(file_writer);
15091512
}
1510-
return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes };
1513+
return { digest: file_writer.digest, total_bytes: file_writer.total_bytes };
15111514
} catch (error) {
15121515
dbg.error('_upload_stream had error: ', error);
15131516
throw error;

src/test/unit_tests/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ require('./test_bucket_chunks_builder');
5757
require('./test_mirror_writer');
5858
require('./test_namespace_fs');
5959
require('./test_ns_list_objects');
60-
require('./test_chunk_fs');
60+
require('./test_file_writer');
6161
require('./test_namespace_fs_mpu');
6262
require('./test_nb_native_fs');
6363
require('./test_s3select');

src/test/unit_tests/nc_index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ coretest.setup();
77

88
require('./test_namespace_fs');
99
require('./test_ns_list_objects');
10-
require('./test_chunk_fs');
10+
require('./test_file_writer');
1111
require('./test_namespace_fs_mpu');
1212
require('./test_nb_native_fs');
1313
require('./test_nc_nsfs_cli');

src/test/unit_tests/test_chunk_fs.js

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/* Copyright (C) 2020 NooBaa */
2+
/* eslint-disable no-invalid-this */
3+
'use strict';
4+
5+
const mocha = require('mocha');
6+
const config = require('../../../config');
7+
const file_writer_hashing = require('../../tools/file_writer_hashing');
8+
const orig_iov_max = config.NSFS_DEFAULT_IOV_MAX;
9+
10+
// on iov_max small tests we need to use smaller amount of parts and chunks to ensure that the test will finish
11+
// in a reasonable period of time because we will flush max 1/2 buffers at a time.
12+
const small_iov_num_parts = 20;
13+
14+
15+
mocha.describe('FileWriter', function() {
16+
const RUN_TIMEOUT = 10 * 60 * 1000;
17+
18+
mocha.afterEach(function() {
19+
config.NSFS_DEFAULT_IOV_MAX = orig_iov_max;
20+
});
21+
22+
mocha.it('Concurrent FileWriter with hash target', async function() {
23+
const self = this;
24+
self.timeout(RUN_TIMEOUT);
25+
await file_writer_hashing.hash_target();
26+
});
27+
28+
mocha.it('Concurrent FileWriter with file target', async function() {
29+
const self = this;
30+
self.timeout(RUN_TIMEOUT);
31+
await file_writer_hashing.file_target();
32+
});
33+
34+
mocha.it('Concurrent FileWriter with hash target - iov_max=1', async function() {
35+
const self = this;
36+
self.timeout(RUN_TIMEOUT);
37+
await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 1);
38+
});
39+
40+
mocha.it('Concurrent FileWriter with file target - iov_max=1', async function() {
41+
const self = this;
42+
self.timeout(RUN_TIMEOUT);
43+
await file_writer_hashing.file_target(undefined, small_iov_num_parts, 1);
44+
});
45+
46+
mocha.it('Concurrent FileWriter with hash target - iov_max=2', async function() {
47+
const self = this;
48+
self.timeout(RUN_TIMEOUT);
49+
await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 2);
50+
});
51+
52+
mocha.it('Concurrent FileWriter with file target - iov_max=2', async function() {
53+
const self = this;
54+
self.timeout(RUN_TIMEOUT);
55+
await file_writer_hashing.file_target(undefined, small_iov_num_parts, 2);
56+
});
57+
58+
mocha.it('Concurrent FileWriter with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() {
59+
const self = this;
60+
self.timeout(RUN_TIMEOUT);
61+
// The goal of this test is to produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L
62+
// so we will flush buffers because of reaching max num of buffers and not because we reached the max NSFS buf size
63+
// chunk size = 100, num_chunks = (10 * 1024 * 1024)/100 < 104587, 104587 = num_chunks > 1024
64+
// chunk size = 100, total_chunks_size after having 1024 chunks is = 100 * 1024 < config.NSFS_BUF_SIZE_L
65+
const chunk_size = 100;
66+
const parts_s = 50;
67+
await file_writer_hashing.file_target(chunk_size, parts_s);
68+
});
69+
});
Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
const crypto = require('crypto');
55
const assert = require('assert');
6-
const ChunkFS = require('../util/chunk_fs');
6+
const FileWriter = require('../util/file_writer');
77
const config = require('../../config');
88
const nb_native = require('../util/nb_native');
99
const stream_utils = require('../util/stream_utils');
@@ -19,7 +19,8 @@ const PARTS = Number(argv.parts) || 1000;
1919
const CONCURRENCY = Number(argv.concurrency) || 20;
2020
const CHUNK = Number(argv.chunk) || 16 * 1024;
2121
const PART_SIZE = Number(argv.part_size) || 20 * 1024 * 1024;
22-
const F_PREFIX = argv.dst_folder || '/tmp/chunk_fs_hashing/';
22+
const F_PREFIX = argv.dst_folder || '/tmp/file_writer_hashing/';
23+
const IOV_MAX = argv.iov_max || config.NSFS_DEFAULT_IOV_MAX;
2324

2425
const DEFAULT_FS_CONFIG = {
2526
uid: Number(argv.uid) || process.getuid(),
@@ -28,12 +29,6 @@ const DEFAULT_FS_CONFIG = {
2829
warn_threshold_ms: 100,
2930
};
3031

31-
const DUMMY_RPC = {
32-
object: {
33-
update_endpoint_stats: (...params) => null
34-
}
35-
};
36-
3732
const XATTR_USER_PREFIX = 'user.';
3833
// TODO: In order to verify validity add content_md5_mtime as well
3934
const XATTR_MD5_KEY = XATTR_USER_PREFIX + 'content_md5';
@@ -64,41 +59,42 @@ function assign_md5_to_fs_xattr(md5_digest, fs_xattr) {
6459
return fs_xattr;
6560
}
6661

67-
async function hash_target() {
68-
await P.map_with_concurrency(CONCURRENCY, Array(PARTS).fill(), async () => {
62+
async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) {
63+
config.NSFS_DEFAULT_IOV_MAX = iov_max;
64+
await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => {
6965
const data = crypto.randomBytes(PART_SIZE);
7066
const content_md5 = crypto.createHash('md5').update(data).digest('hex');
7167
// Using async generator function in order to push data in small chunks
7268
const source_stream = stream.Readable.from(async function*() {
73-
for (let i = 0; i < data.length; i += CHUNK) {
74-
yield data.slice(i, i + CHUNK);
69+
for (let i = 0; i < data.length; i += chunk_size) {
70+
yield data.slice(i, i + chunk_size);
7571
}
7672
}());
7773
const target = new TargetHash();
78-
const chunk_fs = new ChunkFS({
74+
const file_writer = new FileWriter({
7975
target_file: target,
8076
fs_context: DEFAULT_FS_CONFIG,
81-
rpc_client: DUMMY_RPC,
8277
namespace_resource_id: 'MajesticSloth'
8378
});
84-
await stream_utils.pipeline([source_stream, chunk_fs]);
85-
await stream_utils.wait_finished(chunk_fs);
79+
await stream_utils.pipeline([source_stream, file_writer]);
80+
await stream_utils.wait_finished(file_writer);
8681
const write_hash = target.digest();
8782
console.log(
8883
'Hash target',
89-
`NativeMD5=${chunk_fs.digest}`,
84+
`NativeMD5=${file_writer.digest}`,
9085
`DataWriteCryptoMD5=${write_hash}`,
9186
`DataOriginMD5=${content_md5}`,
9287
);
9388
assert.strictEqual(content_md5, write_hash);
9489
if (config.NSFS_CALCULATE_MD5) {
95-
assert.strictEqual(chunk_fs.digest, content_md5);
96-
assert.strictEqual(chunk_fs.digest, write_hash);
90+
assert.strictEqual(file_writer.digest, content_md5);
91+
assert.strictEqual(file_writer.digest, write_hash);
9792
}
9893
});
9994
}
10095

101-
async function file_target(chunk_size = CHUNK, parts = PARTS) {
96+
async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) {
97+
config.NSFS_DEFAULT_IOV_MAX = iov_max;
10298
fs.mkdirSync(F_PREFIX);
10399
await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => {
104100
let target_file;
@@ -113,32 +109,31 @@ async function file_target(chunk_size = CHUNK, parts = PARTS) {
113109
yield data.slice(i, i + chunk_size);
114110
}
115111
}());
116-
const chunk_fs = new ChunkFS({
112+
const file_writer = new FileWriter({
117113
target_file,
118114
fs_context: DEFAULT_FS_CONFIG,
119-
rpc_client: DUMMY_RPC,
120115
namespace_resource_id: 'MajesticSloth'
121116
});
122-
await stream_utils.pipeline([source_stream, chunk_fs]);
123-
await stream_utils.wait_finished(chunk_fs);
117+
await stream_utils.pipeline([source_stream, file_writer]);
118+
await stream_utils.wait_finished(file_writer);
124119
if (XATTR) {
125120
await target_file.replacexattr(
126121
DEFAULT_FS_CONFIG,
127-
assign_md5_to_fs_xattr(chunk_fs.digest, {})
122+
assign_md5_to_fs_xattr(file_writer.digest, {})
128123
);
129124
}
130125
if (FSYNC) await target_file.fsync(DEFAULT_FS_CONFIG);
131126
const write_hash = crypto.createHash('md5').update(fs.readFileSync(F_TARGET)).digest('hex');
132127
console.log(
133128
'File target',
134-
`NativeMD5=${chunk_fs.digest}`,
129+
`NativeMD5=${file_writer.digest}`,
135130
`DataWriteMD5=${write_hash}`,
136131
`DataOriginMD5=${content_md5}`,
137132
);
138133
assert.strictEqual(content_md5, write_hash);
139134
if (config.NSFS_CALCULATE_MD5) {
140-
assert.strictEqual(chunk_fs.digest, content_md5);
141-
assert.strictEqual(chunk_fs.digest, write_hash);
135+
assert.strictEqual(file_writer.digest, content_md5);
136+
assert.strictEqual(file_writer.digest, write_hash);
142137
}
143138
// Leave parts on error
144139
fs.rmSync(F_TARGET);

0 commit comments

Comments
 (0)