Skip to content

Commit

Permalink
Revert "Use std::shared_ptr for gz::transport::NodeShared (gazebo…
Browse files Browse the repository at this point in the history
…sim#484)"

This reverts commit 42e4210.

Signed-off-by: Addisu Z. Taddese <[email protected]>
  • Loading branch information
azeey committed Apr 1, 2024
1 parent 46116e1 commit 0099c2d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 51 deletions.
8 changes: 1 addition & 7 deletions include/gz/transport/NodeShared.hh
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,8 @@ namespace gz
{
/// \brief NodeShared is a singleton. This method gets the
/// NodeShared instance shared between all the nodes.
/// Note: This is deprecated. Please use \sa SharedInstance
/// \return Pointer to the current NodeShared instance.
public: static NodeShared GZ_DEPRECATED(13) *Instance();

/// \brief NodeShared is a singleton. This method gets the
/// a reference counted NodeShared instance shared between all the nodes.
/// \return A shared_ptr to the current NodeShared instance.
public: static std::shared_ptr<NodeShared> SharedInstance();
public: static NodeShared *Instance();

/// \brief Receive data and control messages.
public: void RunReceptionTask();
Expand Down
2 changes: 1 addition & 1 deletion log/src/Recorder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Recorder::Implementation::Implementation()
this->OnMessageReceived(_data, _len, _info);
};

auto shared = NodeShared::SharedInstance();
auto shared = NodeShared::Instance();

this->discovery = std::make_unique<MsgDiscovery>(
Uuid().ToString(), shared->discoveryIP, shared->msgDiscPort);
Expand Down
12 changes: 6 additions & 6 deletions src/Node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ namespace gz
//////////////////////////////////////////////////
int rcvHwm()
{
return NodeShared::SharedInstance()->RcvHwm();
return NodeShared::Instance()->RcvHwm();
}

//////////////////////////////////////////////////
int sndHwm()
{
return NodeShared::SharedInstance()->SndHwm();
return NodeShared::Instance()->SndHwm();
}

//////////////////////////////////////////////////
Expand All @@ -104,14 +104,14 @@ namespace gz
{
/// \brief Default constructor.
public: PublisherPrivate()
: shared(NodeShared::SharedInstance())
: shared(NodeShared::Instance())
{
}

/// \brief Constructor
/// \param[in] _publisher The message publisher.
public: explicit PublisherPrivate(const MessagePublisher &_publisher)
: shared(NodeShared::SharedInstance()),
: shared(NodeShared::Instance()),
publisher(_publisher)
{
}
Expand Down Expand Up @@ -189,7 +189,7 @@ namespace gz

/// \brief Pointer to the object shared between all the nodes within the
/// same process.
public: std::shared_ptr<NodeShared> shared = nullptr;
public: NodeShared *shared = nullptr;

/// \brief The message publisher.
public: MessagePublisher publisher;
Expand Down Expand Up @@ -864,7 +864,7 @@ bool Node::EnableStats(const std::string &_topic, bool _enable,
//////////////////////////////////////////////////
NodeShared *Node::Shared() const
{
return this->dataPtr->shared.get();
return this->dataPtr->shared;
}

//////////////////////////////////////////////////
Expand Down
3 changes: 1 addition & 2 deletions src/NodePrivate.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#ifndef GZ_TRANSPORT_NODEPRIVATE_HH_
#define GZ_TRANSPORT_NODEPRIVATE_HH_

#include <memory>
#include <string>
#include <unordered_set>

Expand Down Expand Up @@ -68,7 +67,7 @@ namespace gz

/// \brief Pointer to the object shared between all the nodes within the
/// same process.
public: std::shared_ptr<NodeShared> shared = NodeShared::SharedInstance();
public: NodeShared *shared = NodeShared::Instance();

/// \brief Partition for this node.
public: std::string partition = hostname() + ":" + username();
Expand Down
70 changes: 35 additions & 35 deletions src/NodeShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,47 +152,47 @@ void sendAuthErrorHelper(zmq::socket_t &_socket, const std::string &_err)
}

//////////////////////////////////////////////////
// LCOV_EXCL_START
NodeShared *NodeShared::Instance()
{
// This is a deprecated function, but since it's public, the following ensures
// backward compatibility by instantiating a shared_ptr that never gets
// deleted.
static std::shared_ptr<NodeShared> nodeShared = NodeShared::SharedInstance();
return nodeShared.get();
}
// LCOV_EXCL_STOP

//////////////////////////////////////////////////
std::shared_ptr<NodeShared> NodeShared::SharedInstance()
{
// Create an instance of NodeShared per process so the ZMQ context
// is not shared between different processes.
static std::weak_ptr<NodeShared> nodeSharedWeak;
static std::mutex mutex;

static std::shared_mutex mutex;
static std::unordered_map<unsigned int, NodeShared*> nodeSharedMap;

// Get current process ID.
auto pid = getProcessId();

// Check if there's already a NodeShared instance for this process.
std::shared_ptr<NodeShared> nodeShared = nodeSharedWeak.lock();
if (nodeShared)
return nodeShared;

// Multiple threads from the same process could have arrived here
// simultaneously, so after locking, we need to make sure that there's
// not an already constructed NodeShared instance for this process.
std::lock_guard lock(mutex);
nodeShared = nodeSharedWeak.lock();
if (nodeShared)
return nodeShared;

// Class used to enable use of std::shared_ptr. This is needed because the
// constructor and destructor of NodeShared are protected.
class MakeSharedEnabler : public NodeShared {};
// No instance, construct a new one.
nodeShared = std::make_shared<MakeSharedEnabler>();
// Assign to weak_ptr so next time SharedInstance is called, we can return the
// instance we just created.
nodeSharedWeak = nodeShared;
return nodeShared;
// Use a shared_lock so multiple threads can read simultaneously.
// This will only block if there's another thread locking exclusively
// for writing. Since most of the time threads will be reading,
// we make the read operation faster at the expense of making the write
// operation slower. Use exceptions for their zero-cost when successful.
try
{
std::shared_lock readLock(mutex);
return nodeSharedMap.at(pid);
}
catch (...)
{
// Multiple threads from the same process could have arrived here
// simultaneously, so after locking, we need to make sure that there's
// not an already constructed NodeShared instance for this process.
std::lock_guard writeLock(mutex);

auto iter = nodeSharedMap.find(pid);
if (iter != nodeSharedMap.end())
{
// There's already an instance for this process, return it.
return iter->second;
}

// No instance, construct a new one.
auto ret = nodeSharedMap.insert({pid, new NodeShared});
assert(ret.second); // Insert operation should be successful.
return ret.first->second;
}
}

//////////////////////////////////////////////////
Expand Down

0 comments on commit 0099c2d

Please sign in to comment.