Skip to content

Commit

Permalink
Merge pull request #7316 from liranmauda/liran-replication-step1
Browse files Browse the repository at this point in the history
Objects versions Replication - Step 1
  • Loading branch information
liranmauda authored May 31, 2023
2 parents cc97376 + f2c6ba4 commit 0e664d1
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 23 deletions.
19 changes: 6 additions & 13 deletions src/api/bucket_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -717,9 +717,7 @@ module.exports = {
method: 'PUT',
params: {
type: 'object',
required: [
'name', 'replication_policy'
],
required: ['name', 'replication_policy'],
properties: {
name: { $ref: 'common_api#/definitions/bucket_name' },
replication_policy: { $ref: '#/definitions/replication_policy' },
Expand All @@ -734,9 +732,7 @@ module.exports = {
method: 'GET',
params: {
type: 'object',
required: [
'name'
],
required: ['name'],
properties: {
name: { $ref: 'common_api#/definitions/bucket_name' },
},
Expand All @@ -753,9 +749,7 @@ module.exports = {
method: 'DELETE',
params: {
type: 'object',
required: [
'name'
],
required: ['name'],
properties: {
name: { $ref: 'common_api#/definitions/bucket_name' },
},
Expand All @@ -770,9 +764,7 @@ module.exports = {
method: 'GET',
params: {
type: 'object',
required: [
'name', 'replication_policy'
],
required: ['name', 'replication_policy'],
properties: {
name: { $ref: 'common_api#/definitions/bucket_name' },
replication_policy: { $ref: '#/definitions/replication_policy' },
Expand Down Expand Up @@ -874,7 +866,7 @@ module.exports = {
versioning: { $ref: '#/definitions/versioning' },
namespace: { $ref: '#/definitions/namespace_bucket_config' },
bucket_claim: { $ref: '#/definitions/bucket_claim' },
logging: {$ref: '#/definitions/logging'},
logging: { $ref: '#/definitions/logging' },
force_md5_etag: {
type: 'boolean'
},
Expand Down Expand Up @@ -1425,6 +1417,7 @@ module.exports = {
}
},
sync_deletions: { type: 'boolean' },
sync_versions: { type: 'boolean' },
}
}
},
Expand Down
89 changes: 83 additions & 6 deletions src/server/bg_services/replication_scanner.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ReplicationScanner {
constructor({ name, client }) {
this.name = name;
this.client = client;
this._scanner_sem = new Semaphore(config.REPLICATION_SEMAPHORE_CAP, {
this.scanner_semaphore = new Semaphore(config.REPLICATION_SEMAPHORE_CAP, {
timeout: config.REPLICATION_SEMAPHORE_TIMEOUT,
timeout_error_code: 'REPLICATION_ITEM_TIMEOUT',
verbose: true
Expand Down Expand Up @@ -88,23 +88,35 @@ class ReplicationScanner {
const cur_src_cont_token = (rule.rule_status && rule.rule_status.src_cont_token) || '';
const cur_dst_cont_token = (rule.rule_status && rule.rule_status.dst_cont_token) || '';

const { keys_sizes_map_to_copy, src_cont_token, dst_cont_token } = await this.list_buckets_and_compare(src_bucket.name,
dst_bucket.name, prefix, cur_src_cont_token, cur_dst_cont_token);
let src_cont_token;
let dst_cont_token;
let keys_sizes_map_to_copy;

// eslint-disable-next-line no-constant-condition
if (false) { //rule.sync_versions) {
dbg.log0(`We have sync_versions :)`);
// TODO:
// ({ keys_sizes_map_to_copy, src_cont_token, dst_cont_token } = await this.list_versioned_buckets_and_compare(
// src_bucket.name, dst_bucket.name, prefix, cur_src_cont_token, cur_dst_cont_token));
} else {
({ keys_sizes_map_to_copy, src_cont_token, dst_cont_token } = await this.list_buckets_and_compare(
src_bucket.name, dst_bucket.name, prefix, cur_src_cont_token, cur_dst_cont_token));
}

dbg.log1('replication_scanner: keys_sizes_map_to_copy: ', keys_sizes_map_to_copy);
let move_res;
const keys_to_copy = Object.keys(keys_sizes_map_to_copy);
if (keys_to_copy.length) {
const copy_type = replication_utils.get_copy_type();
move_res = await replication_utils.move_objects(
this._scanner_sem,
this.scanner_semaphore,
this.client,
copy_type,
src_bucket.name,
dst_bucket.name,
keys_to_copy,
);
console.log('replication_scanner: scan move_res: ', move_res);
dbg.log0(`replication_scanner: scan move_res: ${move_res}`);
}

await replication_store.update_replication_status_by_id(replication_id,
Expand Down Expand Up @@ -176,6 +188,32 @@ class ReplicationScanner {
};
}

async list_versioned_buckets_and_compare(src_bucket, dst_bucket, prefix, cur_src_cont_token, cur_dst_cont_token) {

// list source bucket
const src_version_response = await this.list_objects_versions(src_bucket, prefix, cur_src_cont_token);

const src_contents_left = this._object_grouped_by_key_and_omitted(src_version_response);
const src_cont_token = this._get_next_key_marker(src_version_response.IsTruncated, src_contents_left);

const ans = {
keys_sizes_map_to_copy: {}, //a map between the key and it size, we need it to later report the size in_get_rule_status
src_cont_token,
dst_cont_token: ''
};

// edge case 1: Object.keys(src_contents_left).length = [] , nothing to replicate
if (!Object.keys(src_contents_left).length) return ans;

//TODO: implement the get_keys_version_diff function
return {
...ans,
// if src_list cont token is empty - dst_list cont token should be empty too
//dst_cont_token: (src_cont_token && new_dst_cont_token) || ''
dst_cont_token: ''
};
}

async list_objects(bucket_name, prefix, continuation_token) {
try {
dbg.log1('replication_server list_objects: params:', bucket_name, prefix, continuation_token);
Expand All @@ -194,6 +232,43 @@ class ReplicationScanner {
}
}

// list_objects_versions will list all the objects with the versions, continuation_token is the key marker.
async list_objects_versions(bucket_name, prefix, continuation_token) {
try {
dbg.log1('replication_server list_objects_versions: params:', bucket_name, prefix, continuation_token);
const list = await this.noobaa_connection.listObjectVersions({
Bucket: bucket_name.unwrap(),
Prefix: prefix,
KeyMarker: continuation_token,
MaxKeys: Number(process.env.REPLICATION_MAX_KEYS) || 1000 // Max keys are the total of Versions + DeleteMarkers
}).promise();

return list;
} catch (err) {
dbg.error('replication_server.list_objects_versions: error:', err);
throw err;
}
}

// _object_grouped_by_key_and_omitted will return the objects grouped by key.
// If there is more than one key, it omits the last key from the object,
// In order to avoid processing incomplete list of object + version
_object_grouped_by_key_and_omitted(version_list) {
let grouped_by_key = _.groupBy(version_list.Versions, "Key");
if (version_list.IsTruncated) {
const last_key_pos = version_list.Versions.length - 1;
if (Object.keys(grouped_by_key).length > 1) {
grouped_by_key = _.omit(grouped_by_key, version_list.Versions[last_key_pos].Key);
}
}
return grouped_by_key;
}

// if the list is truncated returns the the next key marker as the last key in the omitted objects list
_get_next_key_marker(is_truncated, contents_list) {
return is_truncated ? Object.keys(contents_list)[Object.keys(contents_list).length - 1] : '';
}

// get_keys_diff finds the object keys that src bucket contains but dst bucket doesn't
// iterate all src_keys and for each if:
// case 1: src_key is lexicographic bigger than last dst_key,
Expand All @@ -214,8 +289,10 @@ class ReplicationScanner {

// case 1
if (cur_src_key > dst_keys[dst_keys.length - 1].Key) {
const src_contents_left = src_keys.slice(i);

// in next iteration we shouldn't iterate again src keys we already passed
const src_contents_left = src_keys.slice(i);

const ans = dst_next_cont_token ? { to_replicate_map, keep_listing_dst: true, src_contents_left } : {
to_replicate_map: src_contents_left.reduce((acc, cur_obj) => {
acc[cur_obj.Key] = cur_obj.Size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module.exports = {
}
},
sync_deletions: { type: 'boolean' },
sync_versions: { type: 'boolean' },
rule_status: {
type: 'object',
required: ['src_cont_token', 'dst_cont_token', 'last_cycle_start', 'last_cycle_end'],
Expand Down
8 changes: 4 additions & 4 deletions src/server/utils/replication_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ function get_copy_type() {
return 'MIX';
}

async function move_objects(scanner_sem, client, copy_type, src_bucket_name, dst_bucket_name, keys) {
async function move_objects(scanner_semaphore, client, copy_type, src_bucket_name, dst_bucket_name, keys) {
try {
const res = await scanner_sem.surround_count(keys.length,
const res = await scanner_semaphore.surround_count(keys.length,
async () => {
try {
const res1 = await client.replication.move_objects_by_type({
Expand Down Expand Up @@ -92,9 +92,9 @@ async function move_objects(scanner_sem, client, copy_type, src_bucket_name, dst
}
}

async function delete_objects(scanner_sem, client, bucket_name, keys) {
async function delete_objects(scanner_semaphore, client, bucket_name, keys) {
try {
const res = await scanner_sem.surround_count(keys.length,
const res = await scanner_semaphore.surround_count(keys.length,
async () => {
try {
const res1 = await client.replication.delete_objects({
Expand Down

0 comments on commit 0e664d1

Please sign in to comment.