Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for timer/srv-server finalization issue 2283 #2284

Open
wants to merge 4 commits into
base: noetic-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions clients/roscpp/src/libros/callback_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ void CallbackQueue::removeByID(uint64_t removal_id)
}

{
boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex, boost::defer_lock);
if (rw_lock.try_lock())
// Unless we're removing from callback, we lock the calling mutex to ensure callback is not being executed.
if (tls_->calling_in_this_thread != id_info->id)
{
boost::unique_lock<boost::shared_mutex> rw_lock(id_info->calling_rw_mutex);
boost::mutex::scoped_lock lock(mutex_);
D_CallbackInfo::iterator it = callbacks_.begin();
for (; it != callbacks_.end();)
Expand All @@ -186,8 +187,8 @@ void CallbackQueue::removeByID(uint64_t removal_id)
}
else
{
// We failed to acquire the lock, it can be that we are trying to remove something from the callback queue
// while it is being executed. Mark it for removal and let it be cleaned up later.
// Since we're removing from callback, locking twice would deadlock.
// Instead, mark callback for removal and let it be cleaned up later.
boost::mutex::scoped_lock lock(mutex_);
for (D_CallbackInfo::iterator it = callbacks_.begin(); it != callbacks_.end(); it++)
{
Expand Down
124 changes: 76 additions & 48 deletions test/test_roscpp/test/test_callback_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/thread/reverse_lock.hpp>
#include <boost/function.hpp>

using namespace ros;
Expand All @@ -66,6 +67,26 @@ class CountingCallback : public CallbackInterface
};
typedef boost::shared_ptr<CountingCallback> CountingCallbackPtr;

struct CustomCallback : public CallbackInterface
{
template <typename Func>
CustomCallback(Func fun)
: function(fun), count(0)
{}

virtual CallResult call()
{
function();
++count;

return Success;
}

boost::function<void()> function;
size_t count;
};
typedef boost::shared_ptr<CustomCallback> CustomCallbackPtr;

void callAvailableThread(CallbackQueue* queue, bool& done)
{
while (!done)
Expand Down Expand Up @@ -177,80 +198,90 @@ TEST(CallbackQueue, removeSelf)
queue.callOne();

queue.addCallback(cb2, 1);

queue.callAvailable();

EXPECT_EQ(cb1->count, 1U);
EXPECT_EQ(cb2->count, 1U);
}

class BlockingCallback : public CallbackInterface
{
public:
BlockingCallback()
: count(0)
{}

virtual CallResult call()
{
mutex_.lock();
++count;

return Success;
}

boost::mutex mutex_;
size_t count;
};
typedef boost::shared_ptr<BlockingCallback> BlockingCallbackPtr;


// This test checks whether removing callbacks by an id doesn't block if one of those callback is being executed.
TEST(CallbackQueue, removeCallbackWhileExecuting)
// This test checks whether self-removing callbacks by an id doesn't block if one of those callback is being executed.
TEST(CallbackQueue, selfRemoveCallbackWhileExecuting)
{
const uint64_t owner_id = 1;
const uint64_t unrelated_id = 2;

boost::mutex external_mtx;

CallbackQueue queue;
BlockingCallbackPtr cb1(boost::make_shared<BlockingCallback>());
CustomCallbackPtr cb1(boost::make_shared<CustomCallback>([&]() {
boost::unique_lock<boost::mutex> external_lock(external_mtx);
boost::this_thread::sleep_for(boost::chrono::milliseconds(300));

{
boost::reverse_lock<boost::unique_lock<boost::mutex>> unlocker(external_lock);
queue.removeByID(owner_id); // external thread blocks here, spinner doesn't
}
}));
CountingCallbackPtr cb2(boost::make_shared<CountingCallback>());
CountingCallbackPtr cb3(boost::make_shared<CountingCallback>());

cb1->mutex_.lock(); // lock the mutex to ensure the blocking callback will stall processing of callback queue.

queue.addCallback(cb1, owner_id); // Add the blocking callback.

// Now, we need to serve the callback queue from another thread.
bool done = false;
boost::thread t = boost::thread(boost::bind(&callAvailableThread, &queue, boost::ref(done)));

ros::WallDuration(1.0).sleep(); // Callback 1 should be being served now.

queue.addCallback(cb1, owner_id); // Add the self-removing callback.
queue.addCallback(cb2, unrelated_id); // Add a second callback with different owner.
queue.addCallback(cb3, owner_id); // Add a third with same owner, this one should never get executed.

// Now try to remove the callback that's being executed.
queue.removeByID(owner_id); // This should not block because cb1 is being served, it should prevent cb3 from running.
// Let's use an external thread to execute cb function and hold its external lock
boost::thread t1([&]() { cb1->call(); });
boost::this_thread::sleep_for(boost::chrono::milliseconds(100));

ros::WallDuration(1.0).sleep();

// The removeByID should not block, so now we can unblock the blocking callback.
cb1->mutex_.unlock(); // This allows processing of cb1 to continue.
// Now, we need to serve the callback queue from another thread.
bool done = false;
boost::thread t2([&]() { callAvailableThread(&queue, done); });

while (!queue.isEmpty()) // Wait until the queue is empty.
{
ros::WallDuration(0.01).sleep();
}

// Properly shut down our callback serving thread.
// Properly shut down our threads.
done = true;
t.join();
t2.join();
t1.join();

EXPECT_EQ(cb1->count, 1U);
EXPECT_EQ(cb1->count, 2U);
EXPECT_EQ(cb2->count, 1U);
EXPECT_EQ(cb3->count, 0U);
}

// This test checks whether non-spinner thread blocks on removeByID until currently executing callback finishes
TEST(CallbackQueue, removeCallbackWhileExecuting)
{
const uint64_t cb_id = 1;
boost::barrier barrier(2);

CallbackQueue queue;
CustomCallbackPtr cb(boost::make_shared<CustomCallback>([&]() {
barrier.wait();
barrier.wait();
}));
queue.addCallback(cb, cb_id);

// Let's ensure spinner thread executes callback now
bool done = false;
boost::thread t1([&]() { callAvailableThread(&queue, done); });
barrier.wait();

cb1->mutex_.unlock(); // Ensure the mutex is unlocked before destruction.
// External removing thread blocks on removeByID
boost::thread t2([&]() { queue.removeByID(cb_id); });
EXPECT_FALSE(t2.try_join_for(boost::chrono::milliseconds(200))); // removebyID blocks until cb finishes

// When callback finishes, external thread proceeds
barrier.wait();
t2.join();
EXPECT_EQ(cb->count, 1U);

done = true;
t1.join();
}

class RecursiveCallback : public CallbackInterface
Expand Down Expand Up @@ -549,6 +580,3 @@ int main(int argc, char** argv)
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}