From 42e421074b96f5f92edd3334b778a900d6d4268e Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Thu, 21 Mar 2024 14:16:27 -0500 Subject: [PATCH] Use `std::shared_ptr` for `gz::transport::NodeShared` (#484) Currently, `NodeShared` is never destroyed once it's created. This makes it hard for writing tests that do not interfere with each other. This patch uses a reference counted smart pointer (`std::shared_ptr`) to keep track of `NodeShared` instances, so that it gets properly destroyed when when all `Node` instances, which themselves contain `NodeShared` instances are destroyed. --------- Signed-off-by: Addisu Z. Taddese --- include/gz/transport/NodeShared.hh | 8 +++- log/src/Recorder.cc | 2 +- src/Node.cc | 12 ++--- src/NodePrivate.hh | 3 +- src/NodeShared.cc | 70 +++++++++++++++--------------- 5 files changed, 51 insertions(+), 44 deletions(-) diff --git a/include/gz/transport/NodeShared.hh b/include/gz/transport/NodeShared.hh index 56df051dc..f4d3d5e4f 100644 --- a/include/gz/transport/NodeShared.hh +++ b/include/gz/transport/NodeShared.hh @@ -68,8 +68,14 @@ namespace gz { /// \brief NodeShared is a singleton. This method gets the /// NodeShared instance shared between all the nodes. + /// Note: This is deprecated. Please use \sa SharedInstance /// \return Pointer to the current NodeShared instance. - public: static NodeShared *Instance(); + public: static NodeShared GZ_DEPRECATED(13) *Instance(); + + /// \brief NodeShared is a singleton. This method gets the + /// a reference counted NodeShared instance shared between all the nodes. + /// \return A shared_ptr to the current NodeShared instance. + public: static std::shared_ptr SharedInstance(); /// \brief Receive data and control messages. public: void RunReceptionTask(); diff --git a/log/src/Recorder.cc b/log/src/Recorder.cc index 11b83040d..bb95fe86b 100644 --- a/log/src/Recorder.cc +++ b/log/src/Recorder.cc @@ -188,7 +188,7 @@ Recorder::Implementation::Implementation() this->OnMessageReceived(_data, _len, _info); }; - auto shared = NodeShared::Instance(); + auto shared = NodeShared::SharedInstance(); this->discovery = std::make_unique( Uuid().ToString(), shared->discoveryIP, shared->msgDiscPort); diff --git a/src/Node.cc b/src/Node.cc index 4dbc29598..ec6116d2c 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -77,13 +77,13 @@ namespace gz ////////////////////////////////////////////////// int rcvHwm() { - return NodeShared::Instance()->RcvHwm(); + return NodeShared::SharedInstance()->RcvHwm(); } ////////////////////////////////////////////////// int sndHwm() { - return NodeShared::Instance()->SndHwm(); + return NodeShared::SharedInstance()->SndHwm(); } ////////////////////////////////////////////////// @@ -104,14 +104,14 @@ namespace gz { /// \brief Default constructor. public: PublisherPrivate() - : shared(NodeShared::Instance()) + : shared(NodeShared::SharedInstance()) { } /// \brief Constructor /// \param[in] _publisher The message publisher. public: explicit PublisherPrivate(const MessagePublisher &_publisher) - : shared(NodeShared::Instance()), + : shared(NodeShared::SharedInstance()), publisher(_publisher) { } @@ -189,7 +189,7 @@ namespace gz /// \brief Pointer to the object shared between all the nodes within the /// same process. - public: NodeShared *shared = nullptr; + public: std::shared_ptr shared = nullptr; /// \brief The message publisher. public: MessagePublisher publisher; @@ -864,7 +864,7 @@ bool Node::EnableStats(const std::string &_topic, bool _enable, ////////////////////////////////////////////////// NodeShared *Node::Shared() const { - return this->dataPtr->shared; + return this->dataPtr->shared.get(); } ////////////////////////////////////////////////// diff --git a/src/NodePrivate.hh b/src/NodePrivate.hh index 181140552..9fd96eeca 100644 --- a/src/NodePrivate.hh +++ b/src/NodePrivate.hh @@ -18,6 +18,7 @@ #ifndef GZ_TRANSPORT_NODEPRIVATE_HH_ #define GZ_TRANSPORT_NODEPRIVATE_HH_ +#include #include #include @@ -67,7 +68,7 @@ namespace gz /// \brief Pointer to the object shared between all the nodes within the /// same process. - public: NodeShared *shared = NodeShared::Instance(); + public: std::shared_ptr shared = NodeShared::SharedInstance(); /// \brief Partition for this node. public: std::string partition = hostname() + ":" + username(); diff --git a/src/NodeShared.cc b/src/NodeShared.cc index ce9e8a0c8..8c822a504 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -152,47 +152,47 @@ void sendAuthErrorHelper(zmq::socket_t &_socket, const std::string &_err) } ////////////////////////////////////////////////// +// LCOV_EXCL_START NodeShared *NodeShared::Instance() +{ + // This is a deprecated function, but since it's public, the following ensures + // backward compatibility by instantiating a shared_ptr that never gets + // deleted. + static std::shared_ptr nodeShared = NodeShared::SharedInstance(); + return nodeShared.get(); +} +// LCOV_EXCL_STOP + +////////////////////////////////////////////////// +std::shared_ptr NodeShared::SharedInstance() { // Create an instance of NodeShared per process so the ZMQ context // is not shared between different processes. - - static std::shared_mutex mutex; - static std::unordered_map nodeSharedMap; - - // Get current process ID. - auto pid = getProcessId(); + static std::weak_ptr nodeSharedWeak; + static std::mutex mutex; // Check if there's already a NodeShared instance for this process. - // Use a shared_lock so multiple threads can read simultaneously. - // This will only block if there's another thread locking exclusively - // for writing. Since most of the time threads will be reading, - // we make the read operation faster at the expense of making the write - // operation slower. Use exceptions for their zero-cost when successful. - try - { - std::shared_lock readLock(mutex); - return nodeSharedMap.at(pid); - } - catch (...) - { - // Multiple threads from the same process could have arrived here - // simultaneously, so after locking, we need to make sure that there's - // not an already constructed NodeShared instance for this process. - std::lock_guard writeLock(mutex); - - auto iter = nodeSharedMap.find(pid); - if (iter != nodeSharedMap.end()) - { - // There's already an instance for this process, return it. - return iter->second; - } - - // No instance, construct a new one. - auto ret = nodeSharedMap.insert({pid, new NodeShared}); - assert(ret.second); // Insert operation should be successful. - return ret.first->second; - } + std::shared_ptr nodeShared = nodeSharedWeak.lock(); + if (nodeShared) + return nodeShared; + + // Multiple threads from the same process could have arrived here + // simultaneously, so after locking, we need to make sure that there's + // not an already constructed NodeShared instance for this process. + std::lock_guard lock(mutex); + nodeShared = nodeSharedWeak.lock(); + if (nodeShared) + return nodeShared; + + // Class used to enable use of std::shared_ptr. This is needed because the + // constructor and destructor of NodeShared are protected. + class MakeSharedEnabler : public NodeShared {}; + // No instance, construct a new one. + nodeShared = std::make_shared(); + // Assign to weak_ptr so next time SharedInstance is called, we can return the + // instance we just created. + nodeSharedWeak = nodeShared; + return nodeShared; } //////////////////////////////////////////////////