diff --git a/src/api/bucket_api.js b/src/api/bucket_api.js index 2c1abd3139..1735c6aa21 100644 --- a/src/api/bucket_api.js +++ b/src/api/bucket_api.js @@ -717,9 +717,7 @@ module.exports = { method: 'PUT', params: { type: 'object', - required: [ - 'name', 'replication_policy' - ], + required: ['name', 'replication_policy'], properties: { name: { $ref: 'common_api#/definitions/bucket_name' }, replication_policy: { $ref: '#/definitions/replication_policy' }, @@ -734,9 +732,7 @@ module.exports = { method: 'GET', params: { type: 'object', - required: [ - 'name' - ], + required: ['name'], properties: { name: { $ref: 'common_api#/definitions/bucket_name' }, }, @@ -753,9 +749,7 @@ module.exports = { method: 'DELETE', params: { type: 'object', - required: [ - 'name' - ], + required: ['name'], properties: { name: { $ref: 'common_api#/definitions/bucket_name' }, }, @@ -770,9 +764,7 @@ module.exports = { method: 'GET', params: { type: 'object', - required: [ - 'name', 'replication_policy' - ], + required: ['name', 'replication_policy'], properties: { name: { $ref: 'common_api#/definitions/bucket_name' }, replication_policy: { $ref: '#/definitions/replication_policy' }, @@ -874,7 +866,7 @@ module.exports = { versioning: { $ref: '#/definitions/versioning' }, namespace: { $ref: '#/definitions/namespace_bucket_config' }, bucket_claim: { $ref: '#/definitions/bucket_claim' }, - logging: {$ref: '#/definitions/logging'}, + logging: { $ref: '#/definitions/logging' }, force_md5_etag: { type: 'boolean' }, @@ -1425,6 +1417,7 @@ module.exports = { } }, sync_deletions: { type: 'boolean' }, + sync_versions: { type: 'boolean' }, } } }, diff --git a/src/server/bg_services/replication_scanner.js b/src/server/bg_services/replication_scanner.js index f686e3c233..a5400a76ba 100644 --- a/src/server/bg_services/replication_scanner.js +++ b/src/server/bg_services/replication_scanner.js @@ -36,7 +36,7 @@ class ReplicationScanner { constructor({ name, client }) { this.name = name; this.client = client; - this._scanner_sem = new Semaphore(config.REPLICATION_SEMAPHORE_CAP, { + this.scanner_semaphore = new Semaphore(config.REPLICATION_SEMAPHORE_CAP, { timeout: config.REPLICATION_SEMAPHORE_TIMEOUT, timeout_error_code: 'REPLICATION_ITEM_TIMEOUT', verbose: true @@ -88,8 +88,20 @@ class ReplicationScanner { const cur_src_cont_token = (rule.rule_status && rule.rule_status.src_cont_token) || ''; const cur_dst_cont_token = (rule.rule_status && rule.rule_status.dst_cont_token) || ''; - const { keys_sizes_map_to_copy, src_cont_token, dst_cont_token } = await this.list_buckets_and_compare(src_bucket.name, - dst_bucket.name, prefix, cur_src_cont_token, cur_dst_cont_token); + let src_cont_token; + let dst_cont_token; + let keys_sizes_map_to_copy; + + // eslint-disable-next-line no-constant-condition + if (false) { //rule.sync_versions) { + dbg.log0(`We have sync_versions :)`); + // TODO: + // ({ keys_sizes_map_to_copy, src_cont_token, dst_cont_token } = await this.list_versioned_buckets_and_compare( + // src_bucket.name, dst_bucket.name, prefix, cur_src_cont_token, cur_dst_cont_token)); + } else { + ({ keys_sizes_map_to_copy, src_cont_token, dst_cont_token } = await this.list_buckets_and_compare( + src_bucket.name, dst_bucket.name, prefix, cur_src_cont_token, cur_dst_cont_token)); + } dbg.log1('replication_scanner: keys_sizes_map_to_copy: ', keys_sizes_map_to_copy); let move_res; @@ -97,14 +109,14 @@ class ReplicationScanner { if (keys_to_copy.length) { const copy_type = replication_utils.get_copy_type(); move_res = await replication_utils.move_objects( - this._scanner_sem, + this.scanner_semaphore, this.client, copy_type, src_bucket.name, dst_bucket.name, keys_to_copy, ); - console.log('replication_scanner: scan move_res: ', move_res); + dbg.log0(`replication_scanner: scan move_res: ${move_res}`); } await replication_store.update_replication_status_by_id(replication_id, @@ -176,6 +188,32 @@ class ReplicationScanner { }; } + async list_versioned_buckets_and_compare(src_bucket, dst_bucket, prefix, cur_src_cont_token, cur_dst_cont_token) { + + // list source bucket + const src_version_response = await this.list_objects_versions(src_bucket, prefix, cur_src_cont_token); + + const src_contents_left = this._object_grouped_by_key_and_omitted(src_version_response); + const src_cont_token = this._get_next_key_marker(src_version_response.IsTruncated, src_contents_left); + + const ans = { + keys_sizes_map_to_copy: {}, //a map between the key and it size, we need it to later report the size in_get_rule_status + src_cont_token, + dst_cont_token: '' + }; + + // edge case 1: Object.keys(src_contents_left).length = [] , nothing to replicate + if (!Object.keys(src_contents_left).length) return ans; + + //TODO: implement the get_keys_version_diff function + return { + ...ans, + // if src_list cont token is empty - dst_list cont token should be empty too + //dst_cont_token: (src_cont_token && new_dst_cont_token) || '' + dst_cont_token: '' + }; + } + async list_objects(bucket_name, prefix, continuation_token) { try { dbg.log1('replication_server list_objects: params:', bucket_name, prefix, continuation_token); @@ -194,6 +232,43 @@ class ReplicationScanner { } } + // list_objects_versions will list all the objects with the versions, continuation_token is the key marker. + async list_objects_versions(bucket_name, prefix, continuation_token) { + try { + dbg.log1('replication_server list_objects_versions: params:', bucket_name, prefix, continuation_token); + const list = await this.noobaa_connection.listObjectVersions({ + Bucket: bucket_name.unwrap(), + Prefix: prefix, + KeyMarker: continuation_token, + MaxKeys: Number(process.env.REPLICATION_MAX_KEYS) || 1000 // Max keys are the total of Versions + DeleteMarkers + }).promise(); + + return list; + } catch (err) { + dbg.error('replication_server.list_objects_versions: error:', err); + throw err; + } + } + + // _object_grouped_by_key_and_omitted will return the objects grouped by key. + // If there is more than one key, it omits the last key from the object, + // In order to avoid processing incomplete list of object + version + _object_grouped_by_key_and_omitted(version_list) { + let grouped_by_key = _.groupBy(version_list.Versions, "Key"); + if (version_list.IsTruncated) { + const last_key_pos = version_list.Versions.length - 1; + if (Object.keys(grouped_by_key).length > 1) { + grouped_by_key = _.omit(grouped_by_key, version_list.Versions[last_key_pos].Key); + } + } + return grouped_by_key; + } + + // if the list is truncated returns the the next key marker as the last key in the omitted objects list + _get_next_key_marker(is_truncated, contents_list) { + return is_truncated ? Object.keys(contents_list)[Object.keys(contents_list).length - 1] : ''; + } + // get_keys_diff finds the object keys that src bucket contains but dst bucket doesn't // iterate all src_keys and for each if: // case 1: src_key is lexicographic bigger than last dst_key, @@ -214,8 +289,10 @@ class ReplicationScanner { // case 1 if (cur_src_key > dst_keys[dst_keys.length - 1].Key) { - const src_contents_left = src_keys.slice(i); + // in next iteration we shouldn't iterate again src keys we already passed + const src_contents_left = src_keys.slice(i); + const ans = dst_next_cont_token ? { to_replicate_map, keep_listing_dst: true, src_contents_left } : { to_replicate_map: src_contents_left.reduce((acc, cur_obj) => { acc[cur_obj.Key] = cur_obj.Size; diff --git a/src/server/system_services/schemas/replication_configuration_schema.js b/src/server/system_services/schemas/replication_configuration_schema.js index cb668bbd58..5968a1859b 100644 --- a/src/server/system_services/schemas/replication_configuration_schema.js +++ b/src/server/system_services/schemas/replication_configuration_schema.js @@ -29,6 +29,7 @@ module.exports = { } }, sync_deletions: { type: 'boolean' }, + sync_versions: { type: 'boolean' }, rule_status: { type: 'object', required: ['src_cont_token', 'dst_cont_token', 'last_cycle_start', 'last_cycle_end'], diff --git a/src/server/utils/replication_utils.js b/src/server/utils/replication_utils.js index 09a7c951e4..9c775da8f4 100644 --- a/src/server/utils/replication_utils.js +++ b/src/server/utils/replication_utils.js @@ -61,9 +61,9 @@ function get_copy_type() { return 'MIX'; } -async function move_objects(scanner_sem, client, copy_type, src_bucket_name, dst_bucket_name, keys) { +async function move_objects(scanner_semaphore, client, copy_type, src_bucket_name, dst_bucket_name, keys) { try { - const res = await scanner_sem.surround_count(keys.length, + const res = await scanner_semaphore.surround_count(keys.length, async () => { try { const res1 = await client.replication.move_objects_by_type({ @@ -92,9 +92,9 @@ async function move_objects(scanner_sem, client, copy_type, src_bucket_name, dst } } -async function delete_objects(scanner_sem, client, bucket_name, keys) { +async function delete_objects(scanner_semaphore, client, bucket_name, keys) { try { - const res = await scanner_sem.surround_count(keys.length, + const res = await scanner_semaphore.surround_count(keys.length, async () => { try { const res1 = await client.replication.delete_objects({