diff --git a/.vscode/launch.json b/.vscode/launch.json index 2d1add6..44530bc 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,33 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "name": "rr - RaftMain", + "type": "cppdbg", + "request": "launch", + "program": "${workspaceFolder}/build/RaftMain", + "miDebuggerServerAddress": "localhost:50505", + "stopAtEntry": false, + "cwd": "${workspaceFolder}", + "environment": [], + "externalConsole": true, + "linux": { + "MIMode": "gdb", + "setupCommands": [ + { + "description": "Setup to resolve symbols", + "text": "set sysroot /", + "ignoreFailures": false + } + ] + }, + "osx": { + "MIMode": "gdb" + }, + "windows": { + "MIMode": "gdb" + } + }, { "name": "Debug - LSMTreeTest", "type": "cppdbg", @@ -84,7 +111,7 @@ "args": [ "-c", "./assets/tkvpp_config.json" - ], // Arguments to pass to the program + ], "stopAtEntry": false, // Set to true to stop at the program's entry point "cwd": "${workspaceFolder}", // Current working directory "environment": [], diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f86449..247c52c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,8 +7,8 @@ set(CMAKE_CXX_STANDARD 23) set(CMAKE_CXX_STANDARD_REQUIRED On) set(CMAKE_CXX_EXTENSIONS Off) -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -pedantic-errors") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic-errors") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Wextra -Wthread-safety -pedantic-errors") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wthread-safety -pedantic-errors") set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}") set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}") diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index b6979ae..c01805a 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -3,3 +3,4 @@ project(zkv) add_subdirectory(absl) add_subdirectory(embedded) +add_subdirectory(raft) diff --git a/examples/raft/main.cpp b/examples/raft/main.cpp index 0e5065e..3aeb43a 100644 --- a/examples/raft/main.cpp +++ b/examples/raft/main.cpp @@ -1,9 +1,15 @@ +#include +#include +#include #include #include #include #include #include +#include +#include +#include #include #include #include @@ -18,6 +24,8 @@ #include +#include "thread_safety.h" + #include "Raft.grpc.pb.h" #include "Raft.pb.h" @@ -73,11 +81,13 @@ class NodeClient auto requestVote(const RequestVoteRequest &request, RequestVoteResponse *response) -> bool { grpc::ClientContext context; - + context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(generateRandomTimeout())); grpc::Status status = m_stub->RequestVote(&context, request, response); if (!status.ok()) { - spdlog::error("RequestVote RPC call failed"); + spdlog::error("RequestVote RPC call failed. Error code={} and message={}", + static_cast(status.error_code()), + status.error_message()); return false; } @@ -98,25 +108,99 @@ class NodeClient /*grpc::CompletionQueue m_cq;*/ }; -class ConsensusModule : public RaftService::Service +class ConsensusModule : public RaftService::Service, std::enable_shared_from_this { public: // @id is the ID of the current node. Order of RaftServices in @replicas is important! ConsensusModule(const ID id, std::vector replicas) - : m_id{id} + : m_id{id}, + m_currentTerm{0} { assert(m_id > 0); - assert(m_replicas.size() > 0); + assert(replicas.size() > 0); + assert(m_id <= replicas.size()); + + m_ip = replicas[m_id - 1]; + + grpc::ServerBuilder builder; + builder.AddListeningPort(m_ip, grpc::InsecureServerCredentials()); + builder.RegisterService(this); + + m_server = builder.BuildAndStart(); for (auto [id, ip] : std::ranges::views::enumerate(replicas)) { - m_replicas.emplace(id, NodeClient(id, ip)); + if (id + 1 == m_id) + { + continue; + } + + m_replicas.emplace(id + 1, NodeClient(id + 1, ip)); } m_nextIndex.resize(m_replicas.size()); m_matchIndex.resize(m_replicas.size()); } + auto AppendEntries(grpc::ServerContext *pContext, + const AppendEntriesRequest *pRequest, + AppendEntriesResponse *pResponse) -> grpc::Status override + { + (void)pContext; + (void)pRequest; + (void)pResponse; + + spdlog::info( + "Recevied AppendEntries RPC from leader={} during term={}", pRequest->senderid(), pRequest->term()); + + absl::MutexLock locker(&m_electionMutex); + pResponse->set_term(m_currentTerm); + pResponse->set_success(true); + pResponse->set_responderid(m_id); + + m_leaderHeartbeatReceived.store(true); + + return grpc::Status::OK; + } + + auto RequestVote(grpc::ServerContext *pContext, const RequestVoteRequest *pRequest, RequestVoteResponse *pResponse) + -> grpc::Status override + { + (void)pContext; + + spdlog::info( + "Received RequestVote RPC from candidate={} during term={}", pRequest->candidateid(), pRequest->term()); + + absl::WriterMutexLock locker(&m_electionMutex); + if (pRequest->term() > m_currentTerm) + { + becomeFollower(pRequest->term()); + } + + if (pRequest->term() < m_currentTerm) + { + pResponse->set_term(m_currentTerm); + pResponse->set_votegranted(0); + pResponse->set_responderid(m_id); + + return grpc::Status::OK; + } + + if (m_votedFor == 0 || m_votedFor == pRequest->candidateid()) + { + m_votedFor = pRequest->candidateid(); + m_currentTerm = pRequest->term(); + + pResponse->set_term(m_currentTerm); + pResponse->set_votegranted(1); + pResponse->set_responderid(m_id); + + return grpc::Status::OK; + } + + return grpc::Status::OK; + } + auto init() -> bool { if (!initializePersistentState()) @@ -136,7 +220,57 @@ class ConsensusModule : public RaftService::Service void start() { - m_electionTimerThread = std::jthread(&ConsensusModule::electionTimeoutTask, this); + m_electionTimerThread = std::jthread( + [this](std::stop_token token) + { + while (!m_stopElectionTimer) + { + if (token.stop_requested()) + { + spdlog::info("Stopping election timer thread"); + return; + } + + if (m_state == NodeState::LEADER) + { + spdlog::info("Skipping the election loop as the node is the leader"); + return; + } + + absl::WriterMutexLock locker(&m_timerMutex); + if (m_timerMutex.AwaitWithTimeout(absl::Condition( + +[](std::atomic *leaderHeartbeatReceived) + { return leaderHeartbeatReceived->load(); }, + &m_leaderHeartbeatReceived), + absl::Milliseconds(m_electionTimeout.load()))) + { + { + absl::MutexLock locker(&m_electionMutex); + spdlog::info("node={} received heartbeat during term={}", m_id, m_currentTerm); + } + + m_leaderHeartbeatReceived.store(false); + } + else + { + startElection(); + } + } + }); + + /*m_serverThread = std::jthread(*/ + /*[this](std::stop_token token)*/ + { + assert(m_server); + spdlog::info("Listening for RPC requests on "); + m_server->Wait(); + } + /*);*/ + } + + void startServer() + { + m_server->Wait(); } void stop() @@ -145,6 +279,9 @@ class ConsensusModule : public RaftService::Service m_timerCV.notify_all(); m_electionTimerThread.request_stop(); m_electionTimerThread.join(); + + m_serverThread.request_stop(); + m_serverThread.join(); } private: @@ -180,76 +317,84 @@ class ConsensusModule : public RaftService::Service // Called every time 'AppendEntries' received. void resetElectionTimer() { - std::lock_guard lock(m_timerMutex); - m_electionTimeout = generateRandomTimeout(); + /*m_timerMutex.AssertHeld();*/ + m_electionTimeout.store(generateRandomTimeout()); m_timerCV.notify_all(); } // The logic behind election void startElection() { - std::lock_guard locker(m_electionMutex); - auto previousTerm = m_currentTerm++; - auto previousState = m_state; - m_state = NodeState::CANDIDATE; + RequestVoteRequest request; + { + absl::WriterMutexLock locker(&m_electionMutex); + m_currentTerm++; + m_state = NodeState::CANDIDATE; - // Node in a canditate state should vote for itself. - m_voteCount++; - m_votedFor = m_id; + spdlog::info("Node={} starts election. New term={}", m_id, m_currentTerm); - m_electionInProgress = true; + // Node in a canditate state should vote for itself. + m_voteCount++; + m_votedFor = m_id; - RequestVoteRequest request; - request.set_term(m_currentTerm); - request.set_candidateid(m_id); - request.set_lastlogterm(getLastLogTerm()); - request.set_lastlogindex(getLastLogIndex()); + m_electionInProgress = true; + + request.set_term(m_currentTerm); + request.set_candidateid(m_id); + request.set_lastlogterm(getLastLogTerm()); + request.set_lastlogindex(getLastLogIndex()); + } std::vector requesterThreads; for (auto &[id, client] : m_replicas) { + /*requesterThreads.emplace_back(*/ + /*[request, this](NodeClient &client)*/ + { + RequestVoteResponse response; + if (!client.requestVote(request, &response)) + { + spdlog::error("RequestVote RPC failed in requester thread"); + return; + } - requesterThreads - .emplace_back( - [request, this](NodeClient &client) - { - RequestVoteResponse response; - if (!client.requestVote(request, &response)) - { - spdlog::error("RequestVote RPC failed in requester thread"); - } + auto responseTerm = response.term(); + auto voteGranted = response.votegranted(); - auto responseTerm = response.term(); - auto voteGranted = response.votegranted(); + spdlog::info( + "Received RequestVoteResponse in requester thread peerTerm={} voteGranted={} responseTerm={}", + responseTerm, + voteGranted, + response.responderid()); - spdlog::info("Received RequestVoteResponse in requester thread peerTerm={} voteGranted={}", - responseTerm, - voteGranted); + absl::MutexLock locker(&m_electionMutex); + if (responseTerm > m_currentTerm) + { + becomeFollower(responseTerm); + return; + } - std::lock_guard locker(m_electionMutex); - if (responseTerm > m_currentTerm) - { - becomeFollower(responseTerm); - return; - } + if ((voteGranted != 0) && responseTerm == m_currentTerm) + { + m_voteCount++; + if (hasMajority(m_voteCount.load())) + { + becomeLeader(); + } + } + } + /*, std::ref(client));*/ + } - if (voteGranted && responseTerm == m_currentTerm) - { - m_voteCount++; - if (hasMajority(m_voteCount.load())) - { - becomeLeader(); - } - } - }, - std::ref(client)) - .detach(); + for (auto &thread : requesterThreads) + { + thread.join(); } } void becomeFollower(const uint32_t newTerm) { - std::lock_guard locker(m_electionMutex); + m_electionMutex.AssertHeld(); m_currentTerm = newTerm; m_state = NodeState::FOLLOWER; @@ -263,39 +408,43 @@ class ConsensusModule : public RaftService::Service } // A task to monitor the election timeout and start a new election if needed. - void electionTimeoutTask(std::stop_token token) - { - while (!m_stopElectionTimer) - { - if (token.stop_requested()) - { - return; - } - - std::unique_lock lock(m_timerMutex); - if (m_timerCV.wait_for(lock, - std::chrono::milliseconds(m_electionTimeout), - [this]() { return m_leaderHeartbeatReceived.load(); })) - { - m_leaderHeartbeatReceived.store(false); - } - else - { - startElection(); - } - } - } - + /*void electionTimeoutTask(std::stop_token token)*/ + /*{*/ + /* while (!m_stopElectionTimer)*/ + /* {*/ + /* if (token.stop_requested())*/ + /* {*/ + /* return;*/ + /* }*/ + /**/ + /* std::unique_lock lock(m_timerMutex);*/ + /* if (m_timerCV.wait_for(lock,*/ + /* std::chrono::milliseconds(m_electionTimeout),*/ + /* [this]() { return m_leaderHeartbeatReceived.load(); }))*/ + /* {*/ + /* m_leaderHeartbeatReceived.store(false);*/ + /* }*/ + /* else*/ + /* {*/ + /* startElection();*/ + /* }*/ + /* }*/ + /*}*/ + /**/ auto hasMajority(const uint32_t votes) const -> bool { - return votes > m_replicas.size() / 2; + return votes > static_cast(m_replicas.size()) / 2.0; } void becomeLeader() { + m_electionMutex.AssertHeld(); + m_state = NodeState::LEADER; m_electionInProgress = false; + spdlog::info("Node={} become a leader at term={}", m_id, m_currentTerm); + for (auto &[id, client] : m_replicas) { sendHeartbeat(client); @@ -312,12 +461,13 @@ class ConsensusModule : public RaftService::Service { AppendEntriesRequest request; { - std::lock_guard locker(m_electionMutex); + absl::ReaderMutexLock locker(&m_electionMutex); request.set_term(m_currentTerm); request.set_prevlogterm(getLastLogTerm()); request.set_prevlogindex(getLastLogIndex()); request.set_leadercommit(m_commitIndex); + request.set_senderid(m_id); } { @@ -331,12 +481,14 @@ class ConsensusModule : public RaftService::Service auto responseTerm = response.term(); auto success = response.success(); - spdlog::info("Received RequestVoteResponse in requester thread peerTerm={} success={}", - responseTerm, - success); + spdlog::info( + "Received AppendEntriesResponse in requester thread peerTerm={} success={} responderId={}", + responseTerm, + success, + response.responderid()); { - std::lock_guard locker(m_electionMutex); + absl::WriterMutexLock locker(&m_electionMutex); if (responseTerm > m_currentTerm) { @@ -359,6 +511,7 @@ class ConsensusModule : public RaftService::Service void decrementNextIndex(ID id) { + (void)id; } [[nodiscard]] auto getLastLogIndex() const -> uint32_t @@ -371,24 +524,28 @@ class ConsensusModule : public RaftService::Service return m_log.empty() ? 0 : m_log.back().term(); } - void revertToFollower(uint32_t newTerm) - { - } - void appendEntriesRPC() { } // Id of the current node. Received from outside. - uint32_t m_id{invalidId}; + ID m_id{invalidId}; + + // IP of the current node. Received from outside. + IP m_ip; + + // gRPC server for the current node + std::unique_ptr m_server; // Each server starts as a follower. NodeState m_state{NodeState::FOLLOWER}; + absl::Mutex m_electionMutex; + // Persistent state on all servers - uint32_t m_currentTerm{0}; - uint32_t m_votedFor{0}; - std::vector m_log{}; + uint32_t m_currentTerm ABSL_GUARDED_BY(m_electionMutex); + uint32_t m_votedFor{0}; + std::vector m_log; // Volatile state on all servers. Reseted on each server start. uint32_t m_commitIndex{0}; @@ -401,19 +558,53 @@ class ConsensusModule : public RaftService::Service // Election and election timer related fields. std::atomic m_leaderHeartbeatReceived{false}; - std::mutex m_timerMutex; + absl::Mutex m_timerMutex; std::condition_variable m_timerCV; std::atomic m_stopElectionTimer{false}; - int m_electionTimeout{0}; + std::atomic m_electionTimeout{0}; std::jthread m_electionTimerThread; - std::mutex m_electionMutex; std::atomic m_voteCount{0}; std::atomic m_electionInProgress{false}; + + std::jthread m_serverThread; }; -int main() +int main(int argc, char *argv[]) { - ConsensusModule cm; + cxxopts::Options options("raft"); + options.add_options()("id", "id of the node", cxxopts::value())( + "nodes", "ip addresses of replicas in a correct order", cxxopts::value>()); + + auto parsedOptions = options.parse(argc, argv); + if ((parsedOptions.count("help") != 0U) || (parsedOptions.count("id") == 0U) || + (parsedOptions.count("nodes") == 0U)) + { + spdlog::info("{}", options.help()); + return EXIT_SUCCESS; + } + + auto nodeId = parsedOptions["id"].as(); + auto nodeIps = parsedOptions["nodes"].as>(); + for (auto ip : nodeIps) + { + spdlog::info(ip); + } + + spdlog::info("nodeIps.size()={}", nodeIps.size()); + + if (nodeId == 0) + { + spdlog::error("ID of the node should be positve integer"); + return EXIT_FAILURE; + } + + if (nodeIps.empty()) + { + spdlog::error("List of node IPs can't be empty"); + return EXIT_FAILURE; + } + + ConsensusModule cm(nodeId, nodeIps); if (!cm.init()) { spdlog::error("Failed to initialize the state machine"); diff --git a/examples/raft/thread_safety.h b/examples/raft/thread_safety.h new file mode 100644 index 0000000..d9287d3 --- /dev/null +++ b/examples/raft/thread_safety.h @@ -0,0 +1,281 @@ +#ifndef THREAD_SAFETY_ANALYSIS_MUTEX_H +#define THREAD_SAFETY_ANALYSIS_MUTEX_H + +// Enable thread safety attributes only with clang. +// The attributes can be safely erased when compiling with other compilers. +#if defined(__clang__) && (!defined(SWIG)) +#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x)) +#else +#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op +#endif + +#define CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x)) + +#define SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable) + +#define GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x)) + +#define PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x)) + +#define ACQUIRED_BEFORE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__)) + +#define ACQUIRED_AFTER(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__)) + +#define REQUIRES(...) THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__)) + +#define REQUIRES_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__)) + +#define ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__)) + +#define ACQUIRE_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__)) + +#define RELEASE(...) THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__)) + +#define RELEASE_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__)) + +#define RELEASE_GENERIC(...) THREAD_ANNOTATION_ATTRIBUTE__(release_generic_capability(__VA_ARGS__)) + +#define TRY_ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__)) + +#define TRY_ACQUIRE_SHARED(...) THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__)) + +#define EXCLUDES(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__)) + +#define ASSERT_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x)) + +#define ASSERT_SHARED_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x)) + +#define RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x)) + +#define NO_THREAD_SAFETY_ANALYSIS THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis) + +// Defines an annotated interface for mutexes. +// These methods can be implemented to use any internal mutex implementation. +// class CAPABILITY("mutex") Mutex +// { +// public: +// // Acquire/lock this mutex exclusively. Only one thread can have exclusive +// // access at any one time. Write operations to guarded data require an +// // exclusive lock. +// void Lock() ACQUIRE(); + +// // Acquire/lock this mutex for read operations, which require only a shared +// // lock. This assumes a multiple-reader, single writer semantics. Multiple +// // threads may acquire the mutex simultaneously as readers, but a writer +// // must wait for all of them to release the mutex before it can acquire it +// // exclusively. +// void ReaderLock() ACQUIRE_SHARED(); + +// // Release/unlock an exclusive mutex. +// void Unlock() RELEASE(); + +// // Release/unlock a shared mutex. +// void ReaderUnlock() RELEASE_SHARED(); + +// // Generic unlock, can unlock exclusive and shared mutexes. +// void GenericUnlock() RELEASE_GENERIC(); + +// // Try to acquire the mutex. Returns true on success, and false on failure. +// bool TryLock() TRY_ACQUIRE(true); + +// // Try to acquire the mutex for read operations. +// bool ReaderTryLock() TRY_ACQUIRE_SHARED(true); + +// // Assert that this mutex is currently held by the calling thread. +// void AssertHeld() ASSERT_CAPABILITY(this); + +// // Assert that is mutex is currently held for read operations. +// void AssertReaderHeld() ASSERT_SHARED_CAPABILITY(this); + +// // For negative capabilities. +// const Mutex &operator!() const +// { +// return *this; +// } +// }; + +// // Tag types for selecting a constructor. +// struct adopt_lock_t +// { +// } inline constexpr adopt_lock = {}; +// struct defer_lock_t +// { +// } inline constexpr defer_lock = {}; +// struct shared_lock_t +// { +// } inline constexpr shared_lock = {}; + +// // MutexLocker is an RAII class that acquires a mutex in its constructor, and +// // releases it in its destructor. +// class SCOPED_CAPABILITY MutexLocker +// { +// private: +// Mutex *mut; +// bool locked; + +// public: +// // Acquire mu, implicitly acquire *this and associate it with mu. +// MutexLocker(Mutex *mu) ACQUIRE(mu) +// : mut(mu), +// locked(true) +// { +// mu->Lock(); +// } + +// // Assume mu is held, implicitly acquire *this and associate it with mu. +// MutexLocker(Mutex *mu, adopt_lock_t) REQUIRES(mu) +// : mut(mu), +// locked(true) +// { +// } + +// // Acquire mu in shared mode, implicitly acquire *this and associate it with mu. +// MutexLocker(Mutex *mu, shared_lock_t) ACQUIRE_SHARED(mu) +// : mut(mu), +// locked(true) +// { +// mu->ReaderLock(); +// } + +// // Assume mu is held in shared mode, implicitly acquire *this and associate it with mu. +// MutexLocker(Mutex *mu, adopt_lock_t, shared_lock_t) REQUIRES_SHARED(mu) +// : mut(mu), +// locked(true) +// { +// } + +// // Assume mu is not held, implicitly acquire *this and associate it with mu. +// MutexLocker(Mutex *mu, defer_lock_t) EXCLUDES(mu) +// : mut(mu), +// locked(false) +// { +// } + +// // Same as constructors, but without tag types. (Requires C++17 copy elision.) +// static MutexLocker Lock(Mutex *mu) ACQUIRE(mu) +// { +// return MutexLocker(mu); +// } + +// static MutexLocker Adopt(Mutex *mu) REQUIRES(mu) +// { +// return MutexLocker(mu, adopt_lock); +// } + +// static MutexLocker ReaderLock(Mutex *mu) ACQUIRE_SHARED(mu) +// { +// return MutexLocker(mu, shared_lock); +// } + +// static MutexLocker AdoptReaderLock(Mutex *mu) REQUIRES_SHARED(mu) +// { +// return MutexLocker(mu, adopt_lock, shared_lock); +// } + +// static MutexLocker DeferLock(Mutex *mu) EXCLUDES(mu) +// { +// return MutexLocker(mu, defer_lock); +// } + +// // Release *this and all associated mutexes, if they are still held. +// // There is no warning if the scope was already unlocked before. +// ~MutexLocker() RELEASE() +// { +// if (locked) +// mut->GenericUnlock(); +// } + +// // Acquire all associated mutexes exclusively. +// void Lock() ACQUIRE() +// { +// mut->Lock(); +// locked = true; +// } + +// // Try to acquire all associated mutexes exclusively. +// bool TryLock() TRY_ACQUIRE(true) +// { +// return locked = mut->TryLock(); +// } + +// // Acquire all associated mutexes in shared mode. +// void ReaderLock() ACQUIRE_SHARED() +// { +// mut->ReaderLock(); +// locked = true; +// } + +// // Try to acquire all associated mutexes in shared mode. +// bool ReaderTryLock() TRY_ACQUIRE_SHARED(true) +// { +// return locked = mut->ReaderTryLock(); +// } + +// // Release all associated mutexes. Warn on double unlock. +// void Unlock() RELEASE() +// { +// mut->Unlock(); +// locked = false; +// } + +// // Release all associated mutexes. Warn on double unlock. +// void ReaderUnlock() RELEASE() +// { +// mut->ReaderUnlock(); +// locked = false; +// } +// }; + +#ifdef USE_LOCK_STYLE_THREAD_SAFETY_ATTRIBUTES +// The original version of thread safety analysis the following attribute +// definitions. These use a lock-based terminology. They are still in use +// by existing thread safety code, and will continue to be supported. + +// Deprecated. +#define PT_GUARDED_VAR THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_var) + +// Deprecated. +#define GUARDED_VAR THREAD_ANNOTATION_ATTRIBUTE__(guarded_var) + +// Replaced by REQUIRES +#define EXCLUSIVE_LOCKS_REQUIRED(...) THREAD_ANNOTATION_ATTRIBUTE__(exclusive_locks_required(__VA_ARGS__)) + +// Replaced by REQUIRES_SHARED +#define SHARED_LOCKS_REQUIRED(...) THREAD_ANNOTATION_ATTRIBUTE__(shared_locks_required(__VA_ARGS__)) + +// Replaced by CAPABILITY +#define LOCKABLE THREAD_ANNOTATION_ATTRIBUTE__(lockable) + +// Replaced by SCOPED_CAPABILITY +#define SCOPED_LOCKABLE THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable) + +// Replaced by ACQUIRE +#define EXCLUSIVE_LOCK_FUNCTION(...) THREAD_ANNOTATION_ATTRIBUTE__(exclusive_lock_function(__VA_ARGS__)) + +// Replaced by ACQUIRE_SHARED +#define SHARED_LOCK_FUNCTION(...) THREAD_ANNOTATION_ATTRIBUTE__(shared_lock_function(__VA_ARGS__)) + +// Replaced by RELEASE and RELEASE_SHARED +#define UNLOCK_FUNCTION(...) THREAD_ANNOTATION_ATTRIBUTE__(unlock_function(__VA_ARGS__)) + +// Replaced by TRY_ACQUIRE +#define EXCLUSIVE_TRYLOCK_FUNCTION(...) THREAD_ANNOTATION_ATTRIBUTE__(exclusive_trylock_function(__VA_ARGS__)) + +// Replaced by TRY_ACQUIRE_SHARED +#define SHARED_TRYLOCK_FUNCTION(...) THREAD_ANNOTATION_ATTRIBUTE__(shared_trylock_function(__VA_ARGS__)) + +// Replaced by ASSERT_CAPABILITY +#define ASSERT_EXCLUSIVE_LOCK(...) THREAD_ANNOTATION_ATTRIBUTE__(assert_exclusive_lock(__VA_ARGS__)) + +// Replaced by ASSERT_SHARED_CAPABILITY +#define ASSERT_SHARED_LOCK(...) THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_lock(__VA_ARGS__)) + +// Replaced by EXCLUDE_CAPABILITY. +#define LOCKS_EXCLUDED(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__)) + +// Replaced by RETURN_CAPABILITY +#define LOCK_RETURNED(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x)) + +#endif // USE_LOCK_STYLE_THREAD_SAFETY_ATTRIBUTES + +#endif // THREAD_SAFETY_ANALYSIS_MUTEX_H \ No newline at end of file diff --git a/lib/proto/CMakeLists.txt b/lib/proto/CMakeLists.txt index d33ec86..f867fa6 100644 --- a/lib/proto/CMakeLists.txt +++ b/lib/proto/CMakeLists.txt @@ -1,15 +1,16 @@ cmake_minimum_required(VERSION 3.25) project(zkv) -set(PROTO_FILE "${CMAKE_CURRENT_LIST_DIR}/TinyKVPP.proto") -if(NOT EXISTS "${PROTO_FILE}") - message(FATAL_ERROR "Proto file not found: ${PROTO_FILE}") +# Proto for TinyKVPP +set(TINYKVPP_PROTO_FILE "${CMAKE_CURRENT_LIST_DIR}/TinyKVPP.proto") +if(NOT EXISTS "${TINYKVPP_PROTO_FILE}") + message(FATAL_ERROR "Proto file not found: ${TINYKVPP_PROTO_FILE}") endif() set(PROTO_BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}") set(PROTO_IMPORT_DIRS "${CMAKE_CURRENT_LIST_DIR}") -add_library(TKVProtoObjects OBJECT "${PROTO_FILE}") +add_library(TKVProtoObjects OBJECT "${TINYKVPP_PROTO_FILE}") target_include_directories(TKVProtoObjects PUBLIC "$" ${protobuf_INCLUDE_DIR}) target_link_libraries(TKVProtoObjects PUBLIC protobuf::libprotobuf gRPC::grpc++) @@ -29,3 +30,33 @@ protobuf_generate( PLUGIN protoc-gen-grpc=$ PLUGIN_OPTIONS generate_mock_code=true GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc) + +# Proto for Raft +set(RAFT_PROTO_FILE "${CMAKE_CURRENT_LIST_DIR}/Raft.proto") +if(NOT EXISTS "${RAFT_PROTO_FILE}") + message(FATAL_ERROR "Proto file not found: ${RAFT_PROTO_FILE}") +endif() + +set(PROTO_BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}") +set(PROTO_IMPORT_DIRS "${CMAKE_CURRENT_LIST_DIR}") + +add_library(RaftProtoObjects OBJECT "${RAFT_PROTO_FILE}") +target_include_directories(RaftProtoObjects PUBLIC "$" ${protobuf_INCLUDE_DIR}) +target_link_libraries(RaftProtoObjects PUBLIC protobuf::libprotobuf gRPC::grpc++) + +if(NOT TARGET gRPC::grpc_cpp_plugin) + message(FATAL_ERROR "gRPC C++ plugin not found. Please gRPC is properly installed.") +endif() + +protobuf_generate( + TARGET RaftProtoObjects + IMPORT_DIRS ${PROTO_IMPORT_DIRS} + PROTOC_OUT_DIR "${PROTO_BINARY_DIR}" +) + +protobuf_generate( + TARGET RaftProtoObjects + LANGUAGE grpc + PLUGIN protoc-gen-grpc=$ + PLUGIN_OPTIONS generate_mock_code=true + GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc) diff --git a/lib/proto/Raft.proto b/lib/proto/Raft.proto index d431e80..e7aa550 100644 --- a/lib/proto/Raft.proto +++ b/lib/proto/Raft.proto @@ -29,11 +29,13 @@ message AppendEntriesRequest { uint32 prevLogTerm = 4; repeated LogEntry entries = 5; uint32 leaderCommit = 6; + uint32 senderId = 7; } message AppendEntriesResponse { uint32 term = 1; bool success = 2; + uint32 responderId = 3; } message RequestVoteRequest { @@ -41,9 +43,11 @@ message RequestVoteRequest { uint32 candidateId = 2; uint32 lastLogIndex = 3; uint32 lastLogterm = 4; + uint32 senderId = 7; } message RequestVoteResponse { uint32 term = 1; uint32 voteGranted = 2; + uint32 responderId = 3; } diff --git a/lib/structures/lsmtree/levels/level.cpp b/lib/structures/lsmtree/levels/level.cpp index 8fd399b..3e4f68c 100644 --- a/lib/structures/lsmtree/levels/level.cpp +++ b/lib/structures/lsmtree/levels/level.cpp @@ -169,7 +169,8 @@ void level_t::merge(const segments::regular_segment::shared_ptr_t &pSegment) noe // Segments overlapping with input memtable auto overlappingSegmentsView = m_storage | std::views::filter( - [](auto pSegment) { + [](auto pSegment) + { return pSegment->min().value() > pSegment->min().value() || pSegment->max().value() < pSegment->max().value(); }); diff --git a/run_replicas.sh b/run_replicas.sh new file mode 100755 index 0000000..2ba4049 --- /dev/null +++ b/run_replicas.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +./build/RaftMain --id 1 --nodes 0.0.0.0:8080,0.0.0.0:8081,0.0.0.0:8082 &> log_1.txt & +./build/RaftMain --id 2 --nodes 0.0.0.0:8080,0.0.0.0:8081,0.0.0.0:8082 &> log_2.txt & +./build/RaftMain --id 3 --nodes 0.0.0.0:8080,0.0.0.0:8081,0.0.0.0:8082 &> log_3.txt &