From 676a4fd406dc98eb8a40a0069e3a7b2d5f7b29c9 Mon Sep 17 00:00:00 2001 From: Hanqing Wu Date: Sun, 10 Dec 2023 16:06:21 +0800 Subject: [PATCH 1/5] curvefs(metaserver): Use DoublyBufferedData to replace read-write locks to manage copysets Signed-off-by: Hanqing Wu --- curvefs/src/metaserver/copyset/copyset_node.h | 6 + .../copyset/copyset_node_manager.cpp | 223 +++++++++++------- .../metaserver/copyset/copyset_node_manager.h | 43 ++-- 3 files changed, 175 insertions(+), 97 deletions(-) diff --git a/curvefs/src/metaserver/copyset/copyset_node.h b/curvefs/src/metaserver/copyset/copyset_node.h index b1a6f14760..3f895a3ed8 100644 --- a/curvefs/src/metaserver/copyset/copyset_node.h +++ b/curvefs/src/metaserver/copyset/copyset_node.h @@ -102,6 +102,8 @@ class CopysetNode : public braft::StateMachine { virtual PeerId GetLeaderId() const; + GroupId GetGroupId() const; + MetaStore* GetMetaStore() const; virtual uint64_t GetConfEpoch() const; @@ -344,6 +346,10 @@ inline bool CopysetNode::IsLoading() const { return isLoading_.load(std::memory_order_acquire); } +inline GroupId CopysetNode::GetGroupId() const { + return groupId_; +} + } // namespace copyset } // namespace metaserver } // namespace curvefs diff --git a/curvefs/src/metaserver/copyset/copyset_node_manager.cpp b/curvefs/src/metaserver/copyset/copyset_node_manager.cpp index a3ceeb5b04..75786135db 100644 --- a/curvefs/src/metaserver/copyset/copyset_node_manager.cpp +++ b/curvefs/src/metaserver/copyset/copyset_node_manager.cpp @@ -32,58 +32,48 @@ #include "curvefs/src/metaserver/copyset/utils.h" #include "src/common/timeutility.h" +#include "src/common/concurrent/generic_name_lock.h" + namespace curvefs { namespace metaserver { namespace copyset { using ::curve::common::TimeUtility; +using NameLockGuard = curve::common::GenericNameLockGuard; bool CopysetNodeManager::IsLoadFinished() const { return loadFinished_.load(std::memory_order_acquire); } -bool CopysetNodeManager::DeleteCopysetNodeInternal(PoolId poolId, - CopysetId copysetId, - bool removeData) { - GroupId groupId = ToGroupId(poolId, copysetId); - - // stop copyset node first - { - ReadLockGuard lock(lock_); - auto it = copysets_.find(groupId); - if (it != copysets_.end()) { - it->second->Stop(); - } else { - LOG(WARNING) << "Delete copyset failed, copyset " - << ToGroupIdString(poolId, copysetId) << " not found"; - return false; - } +bool CopysetNodeManager::DeleteCopysetNodeInternalLocked(PoolId poolId, + CopysetId copysetId, + bool removeData) { + auto node = GetSharedCopysetNode(poolId, copysetId); + if (node == nullptr) { + LOG(WARNING) << "Delete copyset failed, copyset: " + << ToGroupIdString(poolId, copysetId) << " not found"; + return false; } - // remove copyset node - { - WriteLockGuard lock(lock_); - auto it = copysets_.find(groupId); - if (it != copysets_.end()) { - bool ret = true; - if (removeData) { - std::string copysetDataDir = it->second->GetCopysetDataDir(); - if (!trash_.RecycleCopyset(copysetDataDir)) { - LOG(WARNING) << "Recycle copyset remote data failed, " - "copyset data path: '" - << copysetDataDir << "'"; - ret = false; - } - } - - copysets_.erase(it); - LOG(INFO) << "Delete copyset " << ToGroupIdString(poolId, copysetId) - << " success"; - return ret; + node->Stop(); + LOG(INFO) << "Copyset " << ToGroupIdString(poolId, copysetId) << " stopped"; + + bool ret = true; + if (removeData) { + std::string copysetDataDir = node->GetCopysetDataDir(); + if (!trash_.RecycleCopyset(copysetDataDir)) { + LOG(WARNING) << "Recycle copyset remove data failed, " + "copyset data path: '" + << copysetDataDir << "'"; + ret = false; } } - return false; + // delete node + copysets_.Modify(RemoveCopysetNode, node); + LOG(INFO) << "Delete copyset " << ToGroupIdString(poolId, copysetId) + << " success"; + return ret; } bool CopysetNodeManager::Init(const CopysetNodeOptions& options) { @@ -107,11 +97,11 @@ bool CopysetNodeManager::Start() { loadFinished_.store(true, std::memory_order_release); LOG(INFO) << "Reload copysets success"; return true; - } else { - running_.store(false, std::memory_order_release); - LOG(ERROR) << "Reload copysets failed"; - return false; } + + running_.store(false, std::memory_order_release); + LOG(ERROR) << "Reload copysets failed"; + return false; } bool CopysetNodeManager::Stop() { @@ -123,16 +113,24 @@ bool CopysetNodeManager::Stop() { loadFinished_.store(false); { - ReadLockGuard lock(lock_); - for (auto& copyset : copysets_) { + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(ERROR) << "Fail to get copyset nodes"; + return false; + } + + for (const auto& copyset : *ptr) { copyset.second->Stop(); } } - { - WriteLockGuard lock(lock_); - copysets_.clear(); - } + auto clear = [](CopysetNodeMap& map) -> size_t { + map.clear(); + return 1; + }; + + // clear copysets + copysets_.Modify(clear); if (!trash_.Stop()) { LOG(ERROR) << "Stop trash failed"; @@ -146,54 +144,105 @@ bool CopysetNodeManager::Stop() { CopysetNode* CopysetNodeManager::GetCopysetNode(PoolId poolId, CopysetId copysetId) { - ReadLockGuard lock(lock_); + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(poolId, copysetId); + return nullptr; + } - auto it = copysets_.find(ToGroupId(poolId, copysetId)); - if (it != copysets_.end()) { + auto it = ptr->find(ToGroupId(poolId, copysetId)); + if (it != ptr->end()) { return it->second.get(); } + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(poolId, copysetId); return nullptr; } std::shared_ptr CopysetNodeManager::GetSharedCopysetNode( PoolId poolId, CopysetId copysetId) { - ReadLockGuard lock(lock_); + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(poolId, copysetId); + return nullptr; + } - auto it = copysets_.find(ToGroupId(poolId, copysetId)); - if (it != copysets_.end()) { + auto it = ptr->find(ToGroupId(poolId, copysetId)); + if (it != ptr->end()) { return it->second; } + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(poolId, copysetId); return nullptr; } int CopysetNodeManager::IsCopysetNodeExist( const CreateCopysetRequest::Copyset& copyset) { - ReadLockGuard lock(lock_); - auto iter = copysets_.find(ToGroupId(copyset.poolid(), - copyset.copysetid())); - if (iter == copysets_.end()) { + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(copyset.poolid(), copyset.copysetid()); + return 0; + } + + auto iter = ptr->find(ToGroupId(copyset.poolid(), copyset.copysetid())); + if (iter == ptr->end()) { return 0; - } else { - auto copysetNode = iter->second.get(); - std::vector peers; - copysetNode->ListPeers(&peers); - if (peers.size() != static_cast(copyset.peers_size())) { + } + + auto* copysetNode = iter->second.get(); + std::vector peers; + copysetNode->ListPeers(&peers); + if (peers.size() != static_cast(copyset.peers_size())) { + return -1; + } + + for (int i = 0; i < copyset.peers_size(); i++) { + const auto& cspeer = copyset.peers(i); + auto iter = + std::find_if(peers.begin(), peers.end(), [&cspeer](const Peer& p) { + return cspeer.address() == p.address(); + }); + if (iter == peers.end()) { return -1; } + } - for (int i = 0; i < copyset.peers_size(); i++) { - const auto& cspeer = copyset.peers(i); - auto iter = std::find_if(peers.begin(), peers.end(), - [&cspeer](const Peer& p) { - return cspeer.address() == p.address(); - }); - if (iter == peers.end()) { - return -1; - } - } + return 1; +} + +size_t CopysetNodeManager::AddCopysetNode( + CopysetNodeMap& map, const std::shared_ptr& copysetNode) { + assert(copysetNode != nullptr); + + auto groupId = copysetNode->GetGroupId(); + auto it = map.find(groupId); + if (it != map.end()) { + LOG(WARNING) << "Copyset node already exists: " << groupId; + return 0; } + + auto ret = map.emplace(groupId, copysetNode); + CHECK(ret.second); + return 1; +} + +size_t CopysetNodeManager::RemoveCopysetNode( + CopysetNodeMap& map, const std::shared_ptr& copysetNode) { + assert(copysetNode != nullptr); + + auto groupId = copysetNode->GetGroupId(); + auto it = map.find(groupId); + if (it == map.end()) { + LOG(WARNING) << "Copyset node not found: " << groupId; + return 0; + } + + map.erase(it); return 1; } @@ -209,9 +258,11 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId, braft::GroupId groupId = ToGroupId(poolId, copysetId); std::shared_ptr copysetNode; + NameLockGuard guard(copysetLifetimeNameLock_, groupId); + { - WriteLockGuard lock(lock_); - if (copysets_.count(groupId) != 0) { + auto* exist = GetCopysetNode(poolId, copysetId); + if (exist != nullptr) { LOG(WARNING) << "Copyset node already exists: " << ToGroupIdString(poolId, copysetId); return false; @@ -225,7 +276,7 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId, return false; } - copysets_.emplace(groupId, copysetNode); + copysets_.Modify(AddCopysetNode, copysetNode); } // node start maybe time-consuming @@ -234,7 +285,7 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId, // restart, in this case we should not automaticaly remove copyset's // data const bool removeData = checkLoadFinish; - DeleteCopysetNodeInternal(poolId, copysetId, removeData); + DeleteCopysetNodeInternalLocked(poolId, copysetId, removeData); LOG(ERROR) << "Copyset " << ToGroupIdString(poolId, copysetId) << " start failed"; return false; @@ -246,10 +297,16 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId, } void CopysetNodeManager::GetAllCopysets( - std::vector *nodes) const { + std::vector* nodes) const { nodes->clear(); - ReadLockGuard lock(lock_); - for (auto& copyset : copysets_) { + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(WARNING) << "Fail to get all copysets"; + return; + } + + nodes->reserve(ptr->size()); + for (const auto& copyset : *ptr) { nodes->push_back(copyset.second.get()); } } @@ -266,11 +323,15 @@ void CopysetNodeManager::AddService(brpc::Server* server, } bool CopysetNodeManager::DeleteCopysetNode(PoolId poolId, CopysetId copysetId) { - return DeleteCopysetNodeInternal(poolId, copysetId, false); + GroupId groupId = ToGroupId(poolId, copysetId); + NameLockGuard guard(copysetLifetimeNameLock_, groupId); + return DeleteCopysetNodeInternalLocked(poolId, copysetId, false); } bool CopysetNodeManager::PurgeCopysetNode(PoolId poolId, CopysetId copysetId) { - return DeleteCopysetNodeInternal(poolId, copysetId, true); + GroupId groupId = ToGroupId(poolId, copysetId); + NameLockGuard guard(copysetLifetimeNameLock_, groupId); + return DeleteCopysetNodeInternalLocked(poolId, copysetId, true); } } // namespace copyset diff --git a/curvefs/src/metaserver/copyset/copyset_node_manager.h b/curvefs/src/metaserver/copyset/copyset_node_manager.h index 41b58dde67..a0f23a4df6 100644 --- a/curvefs/src/metaserver/copyset/copyset_node_manager.h +++ b/curvefs/src/metaserver/copyset/copyset_node_manager.h @@ -29,10 +29,14 @@ #include #include +#include "butil/containers/doubly_buffered_data.h" + #include "curvefs/src/metaserver/common/types.h" #include "curvefs/src/metaserver/copyset/copyset_node.h" #include "curvefs/src/metaserver/copyset/types.h" +#include "src/common/concurrent/generic_name_lock.h" + namespace curvefs { namespace metaserver { namespace copyset { @@ -79,12 +83,7 @@ class CopysetNodeManager { virtual bool IsLoadFinished() const; public: - CopysetNodeManager() - : options_(), - running_(false), - loadFinished_(false), - lock_(), - copysets_() {} + CopysetNodeManager() = default; public: /** @@ -92,28 +91,40 @@ class CopysetNodeManager { */ void AddService(brpc::Server* server, const butil::EndPoint& listenAddr); - private: - bool DeleteCopysetNodeInternal(PoolId poolId, CopysetId copysetId, - bool removeData); - private: using CopysetNodeMap = std::unordered_map>; + bool DeleteCopysetNodeInternalLocked(PoolId poolId, CopysetId copysetId, + bool removeData); + + // Add copyset node to copyset map. + // Return 1 if success, 0 otherwise + static size_t AddCopysetNode( + CopysetNodeMap& map, // NOLINT(runtime/references) + const std::shared_ptr& copysetNode); + + // Remove copyset node from copyset map. + // Return 1 if success, 0 otherwise + static size_t RemoveCopysetNode( + CopysetNodeMap& map, // NOLINT(runtime/references) + const std::shared_ptr& copysetNode); + + private: CopysetNodeOptions options_; - std::atomic running_; + std::atomic running_{false}; // whether copyset is loaded finished, manager will reject create copyset // request if load unfinished - std::atomic loadFinished_; + std::atomic loadFinished_{false}; - // protected copysets_ - mutable RWLock lock_; + CopysetTrash trash_; - CopysetNodeMap copysets_; + mutable butil::DoublyBufferedData copysets_; - CopysetTrash trash_; + // copyset name lock + curve::common::GenericNameLock copysetLifetimeNameLock_; }; } // namespace copyset From 1d2d16a4110cc416ebbe764d619cd294299732ea Mon Sep 17 00:00:00 2001 From: Hanqing Wu Date: Sun, 10 Dec 2023 21:01:02 +0800 Subject: [PATCH 2/5] Revert "Fix metaserver deadlock caused by bthread coroutine switching" This reverts commit 48014b53f030989e7420aa66adab45ff94bcd7a1. Signed-off-by: Hanqing Wu --- curvefs/src/metaserver/partition.cpp | 97 ++++++++++++---------------- src/common/concurrent/rw_lock.h | 58 ++--------------- 2 files changed, 46 insertions(+), 109 deletions(-) diff --git a/curvefs/src/metaserver/partition.cpp b/curvefs/src/metaserver/partition.cpp index f942000c3b..872a836d1f 100644 --- a/curvefs/src/metaserver/partition.cpp +++ b/curvefs/src/metaserver/partition.cpp @@ -29,7 +29,6 @@ #include #include #include -#include #include "curvefs/proto/metaserver.pb.h" #include "curvefs/src/metaserver/copyset/copyset_node_manager.h" @@ -537,17 +536,8 @@ MetaStatusCode Partition::GetAllBlockGroup( } void Partition::StartS3Compact() { - // register s3 compaction task in a separate thread, since the caller may - // holds a pthread wrlock when calling this function, and create `S3Compact` - // will acquire a bthread rwlock, may cause thread switching, thus causing a - // deadlock. - // FIXME(wuhanqing): handle it in a more elegant way - auto handle = std::async(std::launch::async, [this]() { - S3CompactManager::GetInstance().Register( - S3Compact{inodeManager_, partitionInfo_}); - }); - - handle.wait(); + S3CompactManager::GetInstance().Register( + S3Compact{inodeManager_, partitionInfo_}); } void Partition::CancelS3Compact() { @@ -555,50 +545,45 @@ void Partition::CancelS3Compact() { } void Partition::StartVolumeDeallocate() { - // FIXME(wuhanqing): same as `StartS3Compact` - auto handle = std::async(std::launch::async, [this]() { - FsInfo fsInfo; - bool ok = FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(), - &fsInfo); - if (!ok) { - LOG(ERROR) << "Partition start volume deallocate fail, get fsinfo " - "fail. fsid=" - << partitionInfo_.fsid(); - return; - } - - if (!fsInfo.detail().has_volume()) { - LOG(INFO) << "Partition not belong to volume, do not need start " - "deallocate. partitionInfo=" - << partitionInfo_.DebugString(); - return; - } - - VolumeDeallocateCalOption calOpt; - calOpt.kvStorage = kvStorage_; - calOpt.inodeStorage = inodeStorage_; - calOpt.nameGen = nameGen_; - auto copysetNode = - copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode( - partitionInfo_.poolid(), partitionInfo_.copysetid()); - if (copysetNode == nullptr) { - LOG(ERROR) << "Partition get copyset node failed. poolid=" - << partitionInfo_.poolid() - << ", copysetid=" << partitionInfo_.copysetid(); - return; - } - - InodeVolumeSpaceDeallocate task( - partitionInfo_.fsid(), partitionInfo_.partitionid(), copysetNode); - task.Init(calOpt); - - VolumeDeallocateManager::GetInstance().Register(std::move(task)); - - VLOG(3) << "Partition start volume deallocate success. partitionInfo=" - << partitionInfo_.DebugString(); - }); - - handle.wait(); + FsInfo fsInfo; + bool ok = + FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(), &fsInfo); + if (!ok) { + LOG(ERROR) + << "Partition start volume deallocate fail, get fsinfo fail. fsid=" + << partitionInfo_.fsid(); + return; + } + + if (!fsInfo.detail().has_volume()) { + LOG(INFO) << "Partition not belong to volume, do not need start " + "deallocate. partitionInfo=" + << partitionInfo_.DebugString(); + return; + } + + VolumeDeallocateCalOption calOpt; + calOpt.kvStorage = kvStorage_; + calOpt.inodeStorage = inodeStorage_; + calOpt.nameGen = nameGen_; + auto copysetNode = + copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode( + partitionInfo_.poolid(), partitionInfo_.copysetid()); + if (copysetNode == nullptr) { + LOG(ERROR) << "Partition get copyset node failed. poolid=" + << partitionInfo_.poolid() + << ", copysetid=" << partitionInfo_.copysetid(); + return; + } + + InodeVolumeSpaceDeallocate task(partitionInfo_.fsid(), + partitionInfo_.partitionid(), copysetNode); + task.Init(calOpt); + + VolumeDeallocateManager::GetInstance().Register(std::move(task)); + + VLOG(3) << "Partition start volume deallocate success. partitionInfo=" + << partitionInfo_.DebugString(); } void Partition::CancelVolumeDeallocate() { diff --git a/src/common/concurrent/rw_lock.h b/src/common/concurrent/rw_lock.h index 807afb3b8c..d7c47c7d3c 100644 --- a/src/common/concurrent/rw_lock.h +++ b/src/common/concurrent/rw_lock.h @@ -23,31 +23,13 @@ #ifndef SRC_COMMON_CONCURRENT_RW_LOCK_H_ #define SRC_COMMON_CONCURRENT_RW_LOCK_H_ +#include #include -#include #include -#include -#include // gettid +#include -#include "include/curve_compiler_specific.h" #include "src/common/uncopyable.h" -// Due to the mixed use of bthread and pthread in some cases, acquiring another -// bthread lock(mutex/rwlock) after acquiring a write lock on a pthread rwlock -// may result in switching the bthread coroutine, and then the operation of -// releasing the previous write lock in the other pthread will not take effect -// (implying that the write lock is still held), thus causing a deadlock. - -// Check pthread rwlock tid between wrlock and unlock -#if defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) && \ - (ENABLE_CHECK_PTHREAD_WRLOCK_TID == 1) -#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1 -#elif !defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) -#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1 -#else -#define CURVE_CHECK_PTHREAD_WRLOCK_TID 0 -#endif - namespace curve { namespace common { @@ -69,21 +51,10 @@ class PthreadRWLockBase : public RWLockBase { void WRLock() override { int ret = pthread_rwlock_wrlock(&rwlock_); CHECK(0 == ret) << "wlock failed: " << ret << ", " << strerror(ret); -#if CURVE_CHECK_PTHREAD_WRLOCK_TID - tid_ = gettid(); -#endif } int TryWRLock() override { - int ret = pthread_rwlock_trywrlock(&rwlock_); - if (CURVE_UNLIKELY(ret != 0)) { - return ret; - } - -#if CURVE_CHECK_PTHREAD_WRLOCK_TID - tid_ = gettid(); -#endif - return 0; + return pthread_rwlock_trywrlock(&rwlock_); } void RDLock() override { @@ -96,19 +67,6 @@ class PthreadRWLockBase : public RWLockBase { } void Unlock() override { -#if CURVE_CHECK_PTHREAD_WRLOCK_TID - if (tid_ != 0) { - const pid_t current = gettid(); - // If CHECK here is triggered, please look at the comments at the - // beginning of the file. - // In the meantime, the simplest solution might be to use - // `BthreadRWLock` locks everywhere. - CHECK(tid_ == current) - << ", tid has changed, previous tid: " << tid_ - << ", current tid: " << current; - tid_ = 0; - } -#endif pthread_rwlock_unlock(&rwlock_); } @@ -118,14 +76,8 @@ class PthreadRWLockBase : public RWLockBase { pthread_rwlock_t rwlock_; pthread_rwlockattr_t rwlockAttr_; - -#if CURVE_CHECK_PTHREAD_WRLOCK_TID - pid_t tid_ = 0; -#endif }; -#undef CURVE_CHECK_PTHREAD_WRLOCK_TID - class RWLock : public PthreadRWLockBase { public: RWLock() { @@ -170,7 +122,7 @@ class BthreadRWLock : public RWLockBase { } int TryWRLock() override { - LOG(WARNING) << "TryWRLock not support yet"; + // not support yet return EINVAL; } @@ -180,7 +132,7 @@ class BthreadRWLock : public RWLockBase { } int TryRDLock() override { - LOG(WARNING) << "TryRDLock not support yet"; + // not support yet return EINVAL; } From 921a7bade66881d88b34e48e2362919d66242f26 Mon Sep 17 00:00:00 2001 From: Hanqing Wu Date: Sun, 10 Dec 2023 21:03:06 +0800 Subject: [PATCH 3/5] common: Detect pthread wrlock thread switching Signed-off-by: Hanqing Wu --- src/common/concurrent/rw_lock.h | 58 ++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/src/common/concurrent/rw_lock.h b/src/common/concurrent/rw_lock.h index d7c47c7d3c..807afb3b8c 100644 --- a/src/common/concurrent/rw_lock.h +++ b/src/common/concurrent/rw_lock.h @@ -23,13 +23,31 @@ #ifndef SRC_COMMON_CONCURRENT_RW_LOCK_H_ #define SRC_COMMON_CONCURRENT_RW_LOCK_H_ -#include #include -#include #include +#include +#include +#include // gettid +#include "include/curve_compiler_specific.h" #include "src/common/uncopyable.h" +// Due to the mixed use of bthread and pthread in some cases, acquiring another +// bthread lock(mutex/rwlock) after acquiring a write lock on a pthread rwlock +// may result in switching the bthread coroutine, and then the operation of +// releasing the previous write lock in the other pthread will not take effect +// (implying that the write lock is still held), thus causing a deadlock. + +// Check pthread rwlock tid between wrlock and unlock +#if defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) && \ + (ENABLE_CHECK_PTHREAD_WRLOCK_TID == 1) +#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1 +#elif !defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) +#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1 +#else +#define CURVE_CHECK_PTHREAD_WRLOCK_TID 0 +#endif + namespace curve { namespace common { @@ -51,10 +69,21 @@ class PthreadRWLockBase : public RWLockBase { void WRLock() override { int ret = pthread_rwlock_wrlock(&rwlock_); CHECK(0 == ret) << "wlock failed: " << ret << ", " << strerror(ret); +#if CURVE_CHECK_PTHREAD_WRLOCK_TID + tid_ = gettid(); +#endif } int TryWRLock() override { - return pthread_rwlock_trywrlock(&rwlock_); + int ret = pthread_rwlock_trywrlock(&rwlock_); + if (CURVE_UNLIKELY(ret != 0)) { + return ret; + } + +#if CURVE_CHECK_PTHREAD_WRLOCK_TID + tid_ = gettid(); +#endif + return 0; } void RDLock() override { @@ -67,6 +96,19 @@ class PthreadRWLockBase : public RWLockBase { } void Unlock() override { +#if CURVE_CHECK_PTHREAD_WRLOCK_TID + if (tid_ != 0) { + const pid_t current = gettid(); + // If CHECK here is triggered, please look at the comments at the + // beginning of the file. + // In the meantime, the simplest solution might be to use + // `BthreadRWLock` locks everywhere. + CHECK(tid_ == current) + << ", tid has changed, previous tid: " << tid_ + << ", current tid: " << current; + tid_ = 0; + } +#endif pthread_rwlock_unlock(&rwlock_); } @@ -76,8 +118,14 @@ class PthreadRWLockBase : public RWLockBase { pthread_rwlock_t rwlock_; pthread_rwlockattr_t rwlockAttr_; + +#if CURVE_CHECK_PTHREAD_WRLOCK_TID + pid_t tid_ = 0; +#endif }; +#undef CURVE_CHECK_PTHREAD_WRLOCK_TID + class RWLock : public PthreadRWLockBase { public: RWLock() { @@ -122,7 +170,7 @@ class BthreadRWLock : public RWLockBase { } int TryWRLock() override { - // not support yet + LOG(WARNING) << "TryWRLock not support yet"; return EINVAL; } @@ -132,7 +180,7 @@ class BthreadRWLock : public RWLockBase { } int TryRDLock() override { - // not support yet + LOG(WARNING) << "TryRDLock not support yet"; return EINVAL; } From f23313293cb1e2c96d56317df7ffb5950176de85 Mon Sep 17 00:00:00 2001 From: Hanqing Wu Date: Tue, 12 Dec 2023 11:06:25 +0800 Subject: [PATCH 4/5] common: Implement bthread rwlock try rdlock Signed-off-by: Hanqing Wu --- WORKSPACE | 1 + curvefs/src/metaserver/mds/fsinfo_manager.cpp | 2 +- curvefs/src/metaserver/mds/fsinfo_manager.h | 4 +- curvefs/src/metaserver/metastore.h | 2 +- src/common/concurrent/rw_lock.h | 3 +- .../0002-Add-bthread-rwlock-try-rdlock.patch | 84 +++++++++++++++++++ 6 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 thirdparties/brpc/0002-Add-bthread-rwlock-try-rdlock.patch diff --git a/WORKSPACE b/WORKSPACE index a423f1c46a..4200320eee 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -143,6 +143,7 @@ git_repository( "//:thirdparties/brpc/brpc.patch", "//:thirdparties/brpc/fix-gcc11.patch", "//:thirdparties/brpc/0001-bvar-warning-on-conflict-bvar-name.patch", + "//:thirdparties/brpc/0002-Add-bthread-rwlock-try-rdlock.patch", ], patch_args = ["-p1"], ) diff --git a/curvefs/src/metaserver/mds/fsinfo_manager.cpp b/curvefs/src/metaserver/mds/fsinfo_manager.cpp index 25b2c5ca9d..6797668d0a 100644 --- a/curvefs/src/metaserver/mds/fsinfo_manager.cpp +++ b/curvefs/src/metaserver/mds/fsinfo_manager.cpp @@ -25,7 +25,7 @@ namespace curvefs { namespace metaserver { bool FsInfoManager::GetFsInfo(uint32_t fsId, FsInfo *fsInfo) { - std::lock_guard lock(mtx_); + std::lock_guard lock(mtx_); auto iter = fsInfoMap_.find(fsId); if (iter == fsInfoMap_.end()) { auto ret = mdsClient_->GetFsInfo(fsId, fsInfo); diff --git a/curvefs/src/metaserver/mds/fsinfo_manager.h b/curvefs/src/metaserver/mds/fsinfo_manager.h index 0cf6fb84e8..5b11000a33 100644 --- a/curvefs/src/metaserver/mds/fsinfo_manager.h +++ b/curvefs/src/metaserver/mds/fsinfo_manager.h @@ -26,6 +26,8 @@ #include #include #include + +#include "bthread/mutex.h" #include "curvefs/src/client/rpcclient/mds_client.h" namespace curvefs { @@ -50,7 +52,7 @@ class FsInfoManager { std::shared_ptr mdsClient_; std::map fsInfoMap_; - std::mutex mtx_; + bthread::Mutex mtx_; }; } // namespace metaserver } // namespace curvefs diff --git a/curvefs/src/metaserver/metastore.h b/curvefs/src/metaserver/metastore.h index a13c0a4980..63480fcc48 100644 --- a/curvefs/src/metaserver/metastore.h +++ b/curvefs/src/metaserver/metastore.h @@ -352,7 +352,7 @@ class MetaStoreImpl : public MetaStore { bool ClearInternal(); private: - RWLock rwLock_; // protect partitionMap_ + curve::common::BthreadRWLock rwLock_; // protect partitionMap_ std::shared_ptr kvStorage_; std::map> partitionMap_; std::list partitionIds_; diff --git a/src/common/concurrent/rw_lock.h b/src/common/concurrent/rw_lock.h index 807afb3b8c..f602a9ba57 100644 --- a/src/common/concurrent/rw_lock.h +++ b/src/common/concurrent/rw_lock.h @@ -180,8 +180,7 @@ class BthreadRWLock : public RWLockBase { } int TryRDLock() override { - LOG(WARNING) << "TryRDLock not support yet"; - return EINVAL; + return bthread_rwlock_tryrdlock(&rwlock_); } void Unlock() override { diff --git a/thirdparties/brpc/0002-Add-bthread-rwlock-try-rdlock.patch b/thirdparties/brpc/0002-Add-bthread-rwlock-try-rdlock.patch new file mode 100644 index 0000000000..09b33fd749 --- /dev/null +++ b/thirdparties/brpc/0002-Add-bthread-rwlock-try-rdlock.patch @@ -0,0 +1,84 @@ +From 044f2fce36404727110cd5fd5bab563d76fba71a Mon Sep 17 00:00:00 2001 +From: Hanqing Wu +Date: Mon, 11 Dec 2023 18:12:24 +0800 +Subject: [PATCH] Add bthread rwlock try rdlock + +--- + src/bthread/rwlock.cpp | 28 +++++++++++++++++++++++++++ + test/bthread_brpc_rwlock_unittest.cpp | 18 +++++++++++++++++ + 2 files changed, 46 insertions(+) + +diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp +index 418f4ad0..ca0d49b0 100644 +--- a/src/bthread/rwlock.cpp ++++ b/src/bthread/rwlock.cpp +@@ -106,6 +106,29 @@ inline int rwlock_rlock(bthread_rwlock_t* rwlock) { + + } + ++inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) { ++ butil::atomic* whole = ++ (butil::atomic*)rwlock->lock_flag; ++ butil::atomic* w_wait_count = ++ (butil::atomic*)rwlock->w_wait_count; ++ ++ while (1) { ++ unsigned w = w_wait_count->load(); ++ if (w > 0) { ++ return EBUSY; ++ } ++ // FIXME!! we don't consider read_wait_count overflow yet,2^31 should be enough here ++ unsigned r = whole->load(); ++ if ((r >> 31) == 0) { ++ if (whole->compare_exchange_weak(r, r + 1)) { ++ return 0; ++ } ++ } else { ++ return EBUSY; ++ } ++ } ++} ++ + inline int rwlock_wlock(bthread_rwlock_t* rwlock) { + butil::atomic* w_wait_count = (butil::atomic*)rwlock->w_wait_count; + butil::atomic* whole = (butil::atomic*)rwlock->lock_flag; +@@ -160,4 +183,9 @@ int bthread_rwlock_unrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_un + int bthread_rwlock_unwlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unwlock(rwlock); } + + int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unlock(rwlock); } ++ ++int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) { ++ return bthread::rwlock_tryrdlock(rwlock); ++} ++ + } +diff --git a/test/bthread_brpc_rwlock_unittest.cpp b/test/bthread_brpc_rwlock_unittest.cpp +index f5ce3fb0..345eb322 100644 +--- a/test/bthread_brpc_rwlock_unittest.cpp ++++ b/test/bthread_brpc_rwlock_unittest.cpp +@@ -195,4 +195,22 @@ TEST(RwlockTest, mix_thread_types) { + pthread_join(pthreads[i], NULL); + } + } ++ ++TEST(RWLockTest, try_rdlock_test) { ++ bthread_rwlock_t rwlock; ++ bthread_rwlock_init(&rwlock, NULL); ++ ++ ASSERT_EQ(0, bthread_rwlock_rdlock(&rwlock)); ++ ASSERT_EQ(0, bthread_rwlock_tryrdlock(&rwlock)); ++ ASSERT_EQ(0, bthread_rwlock_unlock(&rwlock)); ++ ASSERT_EQ(0, bthread_rwlock_unlock(&rwlock)); ++ ++ ASSERT_EQ(0, bthread_rwlock_wrlock(&rwlock)); ++ ASSERT_EQ(EBUSY, bthread_rwlock_tryrdlock(&rwlock)); ++ ASSERT_EQ(0, bthread_rwlock_unlock(&rwlock)); ++ ++ ASSERT_EQ(0, bthread_rwlock_tryrdlock(&rwlock)); ++ ASSERT_EQ(0, bthread_rwlock_unlock(&rwlock)); ++} ++ + } // namespace +-- +2.37.2 + From 8ffbe4b02fdc3c810a81a50b2cabc8a2492b7f6b Mon Sep 17 00:00:00 2001 From: Hanqing Wu Date: Tue, 12 Dec 2023 11:08:44 +0800 Subject: [PATCH 5/5] Revert "fix metaserver deadlock caused by bthread coroutine switching" This reverts commit d0cd3fc3b7d5b61ccd53e2f3f3f3c07b0cb8d825. Signed-off-by: Hanqing Wu --- curvefs/src/metaserver/partition.cpp | 7 +------ curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/curvefs/src/metaserver/partition.cpp b/curvefs/src/metaserver/partition.cpp index 872a836d1f..50b97a35bf 100644 --- a/curvefs/src/metaserver/partition.cpp +++ b/curvefs/src/metaserver/partition.cpp @@ -82,12 +82,7 @@ Partition::Partition(PartitionInfo partition, } if (partitionInfo_.status() != PartitionStatus::DELETING) { - auto handle = std::async(std::launch::async, [&]() { - TrashManager::GetInstance().Add( - partitionInfo_.partitionid(), trash); - }); - handle.wait(); - + TrashManager::GetInstance().Add(partitionInfo_.partitionid(), trash); if (startCompact) { StartS3Compact(); } diff --git a/curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp b/curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp index 09dc39c3dc..a3e037074c 100644 --- a/curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp +++ b/curvefs/test/metaserver/copyset/raft_cli_service2_test.cpp @@ -472,7 +472,7 @@ TEST_F(RaftCliService2Test, ChangePeerTest) { // change peer succeed { - // sleep(60); + sleep(60); ChangePeersRequest2 request; ChangePeersResponse2 response; SetRequestPoolAndCopysetId(&request);