Skip to content

Commit cd242fd

Browse files
Add event callbacks into listener for upper layer (#657)
some events need to be handled by upper layer. This PR add three event: 1 fetch_data: upper layer can decide which data to be returned 2 no_space_left: this error should be handled by upper layer if necessary 3 on_log_replay_done: after log replay is done and before joining raft group, upper layer might do something
1 parent 08567d8 commit cd242fd

File tree

6 files changed

+100
-59
lines changed

6 files changed

+100
-59
lines changed

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomestoreConan(ConanFile):
1111
name = "homestore"
12-
version = "6.6.25"
12+
version = "6.7.0"
1313

1414
homepage = "https://github.com/eBay/Homestore"
1515
description = "HomeStore Storage Engine"

src/include/homestore/replication/repl_dev.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <sisl/grpc/generic_service.hpp>
1212
#include <sisl/grpc/rpc_client.hpp>
1313
#include <homestore/replication/repl_decls.h>
14+
#include <homestore/blkdata_service.hpp>
1415
#include <libnuraft/snapshot.hxx>
1516

1617
namespace nuraft {
@@ -367,6 +368,25 @@ class ReplDevListener {
367368
/// @brief Free up user-defined context inside the snapshot_obj that is allocated during read_snapshot_obj.
368369
virtual void free_user_snp_ctx(void*& user_snp_ctx) = 0;
369370

371+
/// @brief ask upper layer to decide which data should be returned.
372+
// @param header - header of the log entry.
373+
// @param blkid - original blkid of the log entry
374+
// @param sgs - sgs to be filled with data
375+
// @param lsn - lsn of the log entry
376+
virtual folly::Future< std::error_code > on_fetch_data(const int64_t lsn, const sisl::blob& header,
377+
const MultiBlkId& blkid, sisl::sg_list& sgs) {
378+
// default implementation is reading by blkid directly
379+
return data_service().async_read(blkid, sgs, sgs.size);
380+
}
381+
382+
/// @brief ask upper layer to handle no_space_left event
383+
virtual folly::Future< std::error_code > on_no_space_left(uint32_t pdev_id, chunk_num_t chunk_id) {
384+
return folly::makeFuture< std::error_code >(std::error_code{});
385+
}
386+
387+
/// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer
388+
virtual void on_log_replay_done(const group_id_t& group_id){};
389+
370390
private:
371391
std::weak_ptr< ReplDev > m_repl_dev;
372392
};
@@ -449,7 +469,7 @@ class ReplDev {
449469
/// @brief Clean up resources on this repl dev.
450470
virtual void purge() = 0;
451471

452-
virtual std::shared_ptr<snapshot_context> deserialize_snapshot_context(sisl::io_blob_safe &snp_ctx) = 0;
472+
virtual std::shared_ptr< snapshot_context > deserialize_snapshot_context(sisl::io_blob_safe& snp_ctx) = 0;
453473

454474
virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); }
455475

@@ -460,6 +480,8 @@ class ReplDev {
460480
}
461481
}
462482

483+
virtual shared< ReplDevListener > get_listener() { return m_listener; }
484+
463485
// we have no shutdown for repl_dev, since shutdown repl_dev is done by repl_service
464486
void stop() {
465487
start_stopping();

src/lib/replication/repl_dev/raft_repl_dev.cpp

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,15 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const&
335335
handle_error(rreq, ReplServiceError::DATA_DUPLICATED);
336336
return;
337337
}
338+
339+
#ifdef _PRERELEASE
340+
if (iomgr_flip::instance()->test_flip("disable_leader_push_data")) {
341+
RD_LOGD("Simulating push data failure, so that all the follower will have to fetch data");
342+
} else
343+
push_data_to_all_followers(rreq, data);
344+
#else
338345
push_data_to_all_followers(rreq, data);
346+
#endif
339347

340348
COUNTER_INCREMENT(m_metrics, total_write_cnt, 1);
341349
COUNTER_INCREMENT(m_metrics, outstanding_data_write_cnt, 1);
@@ -794,30 +802,8 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
794802
auto const& lsn = req->lsn();
795803
auto const& originator = req->blkid_originator();
796804
auto const& remote_blkid = req->remote_blkid();
797-
798-
// Edit this check if in the future we want to fetch from non-originator;
799-
if (originator != server_id()) {
800-
auto const error_msg =
801-
fmt::format("Did not expect to receive fetch data from "
802-
"remote when I am not the originator of this request, originator={}, my_server_id={}",
803-
originator, server_id());
804-
RD_LOGW("{}", error_msg);
805-
auto status = ::grpc::Status(::grpc::INVALID_ARGUMENT, error_msg);
806-
rpc_data->set_status(status);
807-
rpc_data->send_response();
808-
return;
809-
}
810-
811-
// fetch data based on the remote_blkid
812-
// We are the originator of the blkid, read data locally;
813805
MultiBlkId local_blkid;
814-
815-
// convert remote_blkid serialized data to local blkid
816806
local_blkid.deserialize(sisl::blob{remote_blkid->Data(), remote_blkid->size()}, true /* copy */);
817-
818-
RD_LOGD("Data Channel: FetchData received: dsn={} lsn={} my_blkid={}", req->dsn(), lsn,
819-
local_blkid.to_string());
820-
821807
// prepare the sgs data buffer to read into;
822808
auto const total_size = local_blkid.blk_count() * get_blk_size();
823809
sisl::sg_list sgs;
@@ -827,7 +813,18 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
827813

828814
// accumulate the sgs for later use (send back to the requester));
829815
sgs_vec.push_back(sgs);
830-
futs.emplace_back(async_read(local_blkid, sgs, total_size));
816+
817+
if (originator != server_id()) {
818+
RD_LOGD("non-originator FetchData received: dsn={} lsn={} originator={}, my_server_id={}", req->dsn(), lsn,
819+
originator, server_id());
820+
} else {
821+
RD_LOGD("Data Channel: FetchData received: dsn={} lsn={}", req->dsn(), lsn);
822+
}
823+
824+
auto const& header = req->user_header();
825+
sisl::blob user_header = sisl::blob{header->Data(), header->size()};
826+
RD_LOGD("Data Channel: FetchData handled, my_blkid={}", local_blkid.to_string());
827+
futs.emplace_back(std::move(m_listener->on_fetch_data(lsn, user_header, local_blkid, sgs)));
831828
}
832829

833830
folly::collectAllUnsafe(futs).thenValue(
@@ -1238,10 +1235,10 @@ void RaftReplDev::save_config(const nuraft::cluster_config& config) {
12381235

12391236
void RaftReplDev::save_state(const nuraft::srv_state& state) {
12401237
std::unique_lock lg{m_config_mtx};
1241-
(*m_raft_config_sb)["state"] = nlohmann::json{
1242-
{"term", state.get_term()}, {"voted_for", state.get_voted_for()},
1243-
{"election_timer_allowed", state.is_election_timer_allowed()}, {"catching_up", state.is_catching_up()}
1244-
};
1238+
(*m_raft_config_sb)["state"] = nlohmann::json{{"term", state.get_term()},
1239+
{"voted_for", state.get_voted_for()},
1240+
{"election_timer_allowed", state.is_election_timer_allowed()},
1241+
{"catching_up", state.is_catching_up()}};
12451242
m_raft_config_sb.write();
12461243
RD_LOGI("Saved state {}", (*m_raft_config_sb)["state"].dump());
12471244
}
@@ -1251,17 +1248,16 @@ nuraft::ptr< nuraft::srv_state > RaftReplDev::read_state() {
12511248
auto& js = *m_raft_config_sb;
12521249
auto state = nuraft::cs_new< nuraft::srv_state >();
12531250
if (js["state"].empty()) {
1254-
js["state"] = nlohmann::json{
1255-
{"term", state->get_term()}, {"voted_for", state->get_voted_for()},
1256-
{"election_timer_allowed", state->is_election_timer_allowed()},
1257-
{"catching_up", state->is_catching_up()}
1258-
};
1251+
js["state"] = nlohmann::json{{"term", state->get_term()},
1252+
{"voted_for", state->get_voted_for()},
1253+
{"election_timer_allowed", state->is_election_timer_allowed()},
1254+
{"catching_up", state->is_catching_up()}};
12591255
} else {
12601256
try {
12611257
state->set_term(uint64_cast(js["state"]["term"]));
12621258
state->set_voted_for(static_cast< int >(js["state"]["voted_for"]));
1263-
state->allow_election_timer(static_cast<bool>(js["state"]["election_timer_allowed"]));
1264-
state->set_catching_up(static_cast<bool>(js["state"]["catching_up"]));
1259+
state->allow_election_timer(static_cast< bool >(js["state"]["election_timer_allowed"]));
1260+
state->set_catching_up(static_cast< bool >(js["state"]["catching_up"]));
12651261
} catch (std::out_of_range const&) {
12661262
LOGWARN("State data was not in the expected format [group_id={}]!", m_group_id)
12671263
}

src/lib/replication/repl_dev/raft_repl_dev.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,22 +115,22 @@ struct ReplDevCPContext {
115115

116116
class nuraft_snapshot_context : public snapshot_context {
117117
public:
118-
nuraft_snapshot_context(nuraft::snapshot &snp) : snapshot_context(snp.get_last_log_idx()) {
118+
nuraft_snapshot_context(nuraft::snapshot& snp) : snapshot_context(snp.get_last_log_idx()) {
119119
auto snp_buf = snp.serialize();
120120
snapshot_ = nuraft::snapshot::deserialize(*snp_buf);
121121
}
122122

123-
nuraft_snapshot_context(sisl::io_blob_safe const &snp_ctx) : snapshot_context(0) { deserialize(snp_ctx); }
123+
nuraft_snapshot_context(sisl::io_blob_safe const& snp_ctx) : snapshot_context(0) { deserialize(snp_ctx); }
124124

125125
sisl::io_blob_safe serialize() override {
126126
// Dump the context from nuraft buffer to the io blob.
127127
auto snp_buf = snapshot_->serialize();
128-
sisl::io_blob_safe blob{s_cast<size_t>(snp_buf->size())};
128+
sisl::io_blob_safe blob{s_cast< size_t >(snp_buf->size())};
129129
std::memcpy(blob.bytes(), snp_buf->data_begin(), snp_buf->size());
130130
return blob;
131131
}
132132

133-
void deserialize(const sisl::io_blob_safe &snp_ctx) {
133+
void deserialize(const sisl::io_blob_safe& snp_ctx) {
134134
// Load the context from the io blob to nuraft buffer.
135135
auto snp_buf = nuraft::buffer::alloc(snp_ctx.size());
136136
snp_buf->put_raw(snp_ctx.cbytes(), snp_ctx.size());
@@ -139,10 +139,10 @@ class nuraft_snapshot_context : public snapshot_context {
139139
lsn_ = snapshot_->get_last_log_idx();
140140
}
141141

142-
nuraft::ptr<nuraft::snapshot> nuraft_snapshot() { return snapshot_; }
142+
nuraft::ptr< nuraft::snapshot > nuraft_snapshot() { return snapshot_; }
143143

144144
private:
145-
nuraft::ptr<nuraft::snapshot> snapshot_;
145+
nuraft::ptr< nuraft::snapshot > snapshot_;
146146
};
147147

148148
class RaftReplDev : public ReplDev,
@@ -236,12 +236,12 @@ class RaftReplDev : public ReplDev,
236236
m_data_journal->purge_all_logs();
237237
}
238238

239-
std::shared_ptr<snapshot_context> deserialize_snapshot_context(sisl::io_blob_safe &snp_ctx) override {
240-
return std::make_shared<nuraft_snapshot_context>(snp_ctx);
239+
std::shared_ptr< snapshot_context > deserialize_snapshot_context(sisl::io_blob_safe& snp_ctx) override {
240+
return std::make_shared< nuraft_snapshot_context >(snp_ctx);
241241
}
242242

243243
//////////////// Accessor/shortcut methods ///////////////////////
244-
nuraft_mesg::repl_service_ctx *group_msg_service();
244+
nuraft_mesg::repl_service_ctx* group_msg_service();
245245

246246
nuraft::raft_server* raft_server();
247247
RaftReplDevMetrics& metrics() { return m_metrics; }

src/lib/replication/service/raft_repl_service.cpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,21 @@ void RaftReplService::start() {
156156
LOGINFO("Starting DataService");
157157
hs()->data_service().start();
158158

159-
// Step 6: Iterate all the repl dev and ask each one of the join the raft group.
160-
for (auto it = m_rd_map.begin(); it != m_rd_map.end();) {
161-
auto rdev = std::dynamic_pointer_cast< RaftReplDev >(it->second);
162-
rdev->wait_for_logstore_ready();
163-
if (!rdev->join_group()) {
164-
HS_REL_ASSERT(false, "FAILED TO JOIN GROUP, PANIC HERE");
165-
it = m_rd_map.erase(it);
166-
} else {
167-
++it;
168-
}
159+
// Step 6: Iterate all the repl devs and ask each one of them to join the raft group concurrently.
160+
std::vector< std::future< bool > > join_group_futures;
161+
for (const auto& [_, repl_dev] : m_rd_map) {
162+
join_group_futures.emplace_back(std::async(std::launch::async, [&repl_dev]() {
163+
auto rdev = std::dynamic_pointer_cast< RaftReplDev >(repl_dev);
164+
rdev->wait_for_logstore_ready();
165+
166+
// upper layer can register a callback to be notified when log replay is done.
167+
if (auto listener = rdev->get_listener(); listener) listener->on_log_replay_done(rdev->group_id());
168+
return rdev->join_group();
169+
}));
170+
}
171+
172+
for (auto& future : join_group_futures) {
173+
if (!future.get()) HS_REL_ASSERT(false, "FAILED TO JOIN GROUP, PANIC HERE");
169174
}
170175

171176
// Step 7: Register to CPManager to ensure we can flush the superblk.

src/tests/test_raft_repl_dev.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ TEST_F(RaftReplDevTest, Write_Duplicated_Data) {
3333
stored_key = dbs_[0]->inmem_db_.cbegin()->first;
3434
ASSERT_EQ(id, stored_key.id_);
3535
} else {
36-
LOGINFO("I am not leader, leader_uuid={} my_uuid={}, do nothing",
37-
boost::uuids::to_string(leader_uuid), boost::uuids::to_string(g_helper->my_replica_id()));
36+
LOGINFO("I am not leader, leader_uuid={} my_uuid={}, do nothing", boost::uuids::to_string(leader_uuid),
37+
boost::uuids::to_string(g_helper->my_replica_id()));
3838
}
3939
wait_for_commits(total_writes);
4040

@@ -45,12 +45,12 @@ TEST_F(RaftReplDevTest, Write_Duplicated_Data) {
4545
if duplication found in leader proposal, reject it;
4646
if duplication found in the followers, skip it.
4747
*/
48-
//1. write the same data again on leader, should fail
48+
// 1. write the same data again on leader, should fail
4949
if (leader_uuid == g_helper->my_replica_id()) {
5050
auto err = this->write_with_id(id, true /* wait_for_commit */);
5151
ASSERT_EQ(ReplServiceError::DATA_DUPLICATED, err);
5252

53-
//2. delete it from the db to simulate duplication in followers(skip the duplication check in leader side)
53+
// 2. delete it from the db to simulate duplication in followers(skip the duplication check in leader side)
5454
dbs_[0]->inmem_db_.erase(stored_key);
5555
LOGINFO("data with id={} has been deleted from db", id);
5656
err = this->write_with_id(id, true /* wait_for_commit */);
@@ -109,6 +109,24 @@ TEST_F(RaftReplDevTest, Follower_Fetch_OnActive_ReplicaGroup) {
109109

110110
g_helper->sync_for_cleanup_start();
111111
}
112+
113+
TEST_F(RaftReplDevTest, Write_With_Diabled_Leader_Push_Data) {
114+
g_helper->set_basic_flip("disable_leader_push_data");
115+
LOGINFO("Homestore replica={} setup completed, all the push_data from leader are disabled",
116+
g_helper->replica_num());
117+
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
118+
g_helper->sync_for_test_start();
119+
120+
this->write_on_leader(100, true /* wait_for_commit */);
121+
122+
g_helper->sync_for_verify_start();
123+
124+
LOGINFO("Validate all data written so far by reading them");
125+
this->validate_data();
126+
127+
g_helper->sync_for_cleanup_start();
128+
}
129+
112130
#endif
113131

114132
// do some io before restart;

0 commit comments

Comments
 (0)