Skip to content

Commit 864c921

Browse files
committed
add tiny delay at the end of Stream open to avoid race condition + test revalidation
1 parent 06cd22b commit 864c921

File tree

4 files changed

+12
-9
lines changed

4 files changed

+12
-9
lines changed

src/Engine.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ void Engine::end_pub_transaction()
138138
void Engine::pub_close()
139139
{
140140
auto self = sg4::Actor::self();
141-
XBT_DEBUG("Publisher '%s' is closing the engine", self->get_cname());
141+
XBT_DEBUG("Publisher '%s' is closing the engine '%s'", self->get_cname(), get_cname());
142142
if (not pub_closing_) {
143143
// I'm the first to close
144144
pub_closing_ = true;
@@ -221,7 +221,7 @@ void Engine::end_sub_transaction()
221221
void Engine::sub_close()
222222
{
223223
auto self = sg4::Actor::self();
224-
XBT_DEBUG("Subscriber '%s' is closing the engine", self->get_cname());
224+
XBT_DEBUG("Subscriber '%s' is closing the engine '%s'", self->get_cname(), get_cname());
225225
if (not sub_closing_) {
226226
// I'm the first to close
227227
sub_closing_ = true;

src/Stream.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
133133
dtl_->lock();
134134

135135
if (not engine_) {
136+
XBT_DEBUG("Create Engine");
136137
if (engine_type_ == Engine::Type::Staging) {
137138
engine_ = std::make_shared<StagingEngine>(name, this);
138139
} else if (engine_type_ == Engine::Type::File) {
@@ -145,7 +146,7 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
145146
dtl_->unlock();
146147

147148
while (not engine_)
148-
sg4::this_actor::sleep_for(0.05);
149+
sg4::this_actor::sleep_for(0.01);
149150

150151
// Then we register the actors calling Stream::open as publishers or subscribers in the newly created Engine.
151152
if (mode == dtlmod::Stream::Mode::Publish) {
@@ -154,9 +155,11 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
154155
engine_->add_subscriber(sg4::Actor::self(), rendez_vous_);
155156
}
156157

157-
XBT_DEBUG("Stream '%s' uses engine '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), get_engine_type_str(),
158-
get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers());
158+
XBT_DEBUG("Stream '%s' uses engine '%s' of type '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), engine_->get_cname(),
159+
get_engine_type_str(), get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers());
159160

161+
sg4::this_actor::sleep_for(0.05);
162+
160163
return engine_;
161164
}
162165

test/dtl_file_engine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ TEST_F(DTLFileEngineTest, MultiplePubSingleSubSharedStorage)
234234
XBT_INFO("Start a Transaction");
235235
ASSERT_NO_THROW(engine->begin_transaction());
236236
XBT_INFO("Transition can start as publishers have finished writing");
237-
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.651431842993127);
237+
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.701431842993127);
238238
XBT_INFO("Get the entire Variable 'var' from the DTL");
239239
ASSERT_NO_THROW(engine->get(var_sub));
240240
XBT_INFO("End a Transaction");

test/dtl_stream.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ TEST_F(DTLStreamTest, PublishFileMultipleOpen)
138138
XBT_INFO("Open the Stream in Stream::Mode::Publish mode");
139139
ASSERT_NO_THROW(engine = stream->open("zone:fs:/pfs/file", dtlmod::Stream::Mode::Publish));
140140
XBT_INFO("Check current number of publishers and subscribers");
141-
ASSERT_EQ(stream->get_num_publishers(), 1);
141+
ASSERT_EQ(stream->get_num_publishers(), 2);
142142
ASSERT_EQ(stream->get_num_subscribers(), 0);
143143
ASSERT_NO_THROW(sg4::this_actor::sleep_for(1));
144144
XBT_INFO("Close the engine");
@@ -194,8 +194,8 @@ TEST_F(DTLStreamTest, OpenWithRendezVous)
194194
ASSERT_NO_THROW(stream->set_rendez_vous());
195195
XBT_INFO("Open the Stream in Stream::Mode::Publish mode");
196196
ASSERT_NO_THROW(engine = stream->open("foo", dtlmod::Stream::Mode::Publish));
197-
XBT_INFO("Open complete. Clock should be at 10s");
198-
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.0);
197+
XBT_INFO("Open complete. Clock should be at 10.05s");
198+
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.05);
199199
XBT_INFO("Let actor %s sleep for 1 second", sg4::this_actor::get_cname());
200200
ASSERT_NO_THROW(sg4::this_actor::sleep_for(1));
201201
XBT_INFO("Close the engine");

0 commit comments

Comments
 (0)