Skip to content

Commit 6482be8

Browse files
committed
properly wait for completion of sub transactions
1 parent 9d815a7 commit 6482be8

File tree

2 files changed

+28
-24
lines changed

2 files changed

+28
-24
lines changed

include/dtlmod/Engine.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ class Engine {
7070
sg4::ActivitySet sub_transaction_;
7171
sg4::MutexPtr sub_mutex_;
7272
sg4::ConditionVariablePtr first_pub_transaction_started_;
73-
sg4::ConditionVariablePtr first_pub_transaction_completed_;
73+
sg4::ConditionVariablePtr sub_transaction_started_;
74+
sg4::ConditionVariablePtr pub_transaction_completed_;
7475

7576
unsigned int sub_transaction_id_ = 1;
7677
bool sub_transaction_in_progress_ = false;
@@ -113,7 +114,8 @@ class Engine {
113114
, pub_mutex_(sg4::Mutex::create())
114115
, sub_mutex_(sg4::Mutex::create())
115116
, first_pub_transaction_started_(sg4::ConditionVariable::create())
116-
, first_pub_transaction_completed_(sg4::ConditionVariable::create())
117+
, sub_transaction_started_(sg4::ConditionVariable::create())
118+
, pub_transaction_completed_(sg4::ConditionVariable::create())
117119
{
118120
}
119121
virtual ~Engine() = default;

src/Engine.cpp

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -99,27 +99,37 @@ void Engine::begin_pub_transaction()
9999
// Only one publisher has to do this
100100
std::unique_lock<sg4::Mutex> lock(*pub_mutex_);
101101
if (not pub_transaction_in_progress_) {
102-
XBT_DEBUG("Publish Transaction %u started by %s", pub_transaction_id_, sg4::Actor::self()->get_cname());
103102
pub_transaction_in_progress_ = true;
104103
if (not pub_barrier_) { // This is the first transaction.
105104
if (not publishers_.empty()) { // Assume all publishers have opened the stream and create a barrier
106105
XBT_DEBUG("Create a barrier for %zu publishers", publishers_.size());
107106
pub_barrier_ = sg4::Barrier::create(publishers_.size());
108107
first_pub_transaction_started_->notify_all();
109108
}
109+
110110
} else {
111111
// Wait for the completion of the Publish activities from the previous transaction
112112
XBT_DEBUG("Wait for the completion of %u publish activities from the previous transaction",
113113
pub_transaction_.size());
114114
pub_transaction_.wait_all();
115115
XBT_DEBUG("All on-flight publish activities are completed. Proceed with the current transaction.");
116-
pub_transaction_.clear();
117116
pub_transaction_id_++;
117+
XBT_DEBUG("%u sub activities pending", sub_transaction_.size());
118118
if (pub_transaction_id_ >= sub_transaction_id_) {
119+
pub_transaction_.clear();
119120
// We may have subscribers waiting for a transaction to be over. Notify them
120-
first_pub_transaction_completed_->notify_all();
121+
pub_transaction_completed_->notify_all();
121122
}
122123
}
124+
XBT_DEBUG("Publish Transaction %u started by %s", pub_transaction_id_, sg4::Actor::self()->get_cname());
125+
}
126+
if (type_ == Type::Staging) {
127+
XBT_DEBUG("Maybe I should wait: %zu subscribers and %u <= %u" , get_num_subscribers(), pub_transaction_id_, sub_transaction_id_ -1);
128+
while (get_num_subscribers() == 0 || pub_transaction_id_ < sub_transaction_id_ -1 ) {
129+
XBT_DEBUG("Wait");
130+
sub_transaction_started_->wait(lock);
131+
}
132+
123133
}
124134
}
125135

@@ -151,7 +161,7 @@ void Engine::pub_close()
151161
if (type_ == Type::File) {
152162
if (get_num_subscribers() > 0 && pub_transaction_id_ >= sub_transaction_id_) {
153163
// We may have subscribers waiting for a transaction to be over. Notify them
154-
first_pub_transaction_completed_->notify_all();
164+
pub_transaction_completed_->notify_all();
155165
}
156166
}
157167
}
@@ -183,39 +193,37 @@ void Engine::begin_sub_transaction()
183193
// We have publishers on that stream, wait for them to complete a transaction first
184194
if (type_ == Type::File && get_num_publishers() > 0) {
185195
while (pub_transaction_id_ < sub_transaction_id_)
186-
first_pub_transaction_completed_->wait(lock);
196+
pub_transaction_completed_->wait(lock);
187197
}
188-
189198
if (not sub_transaction_in_progress_) {
199+
if (type_ == Type::Staging && pub_transaction_id_ == sub_transaction_id_ -1)
200+
sub_transaction_started_->notify_all();
190201
XBT_DEBUG("Subscribe Transaction %u started by %s", sub_transaction_id_, sg4::Actor::self()->get_cname());
191202
sub_transaction_in_progress_ = true;
192203
if (not sub_barrier_) { // This is the first transaction.
193204
if (not subscribers_.empty()) { // Assume all subscribers have opened the stream and create a barrier
194205
XBT_DEBUG("Create a barrier for %zu subscribers", subscribers_.size());
195206
sub_barrier_ = sg4::Barrier::create(subscribers_.size());
196207
}
197-
} else {
198-
// wait for the completion of the Subscribe activities from the previous transaction
199-
XBT_DEBUG("Wait for the completion of %u subscribe activities from the previous transaction",
200-
sub_transaction_.size());
201-
sub_transaction_.wait_all();
202-
XBT_DEBUG("All on-flight subscribe activities are completed. Proceed with the current transaction.");
203-
sub_transaction_.clear();
204208
}
205209
}
206210
}
207211

208212
void Engine::end_sub_transaction()
209213
{
210214
if (sub_barrier_ && sub_barrier_->wait()) { // I'm the last subscriber entering the barrier
211-
XBT_DEBUG("Start the %d subscribe activities for the transaction", sub_transaction_.size());
215+
XBT_DEBUG("Wait for the %d subscribe activities for the transaction", sub_transaction_.size());
212216
for (unsigned int i = 0; i < sub_transaction_.size(); i++)
213-
sub_transaction_.at(i)->resume();
214-
217+
sub_transaction_.at(i)->resume()->wait();
218+
XBT_DEBUG("All on-flight subscribe activities are completed. Proceed with the current transaction.");
219+
sub_transaction_.clear();
215220
// Mark this transaction as over
216221
sub_transaction_in_progress_ = false;
217222
sub_transaction_id_++;
218223
}
224+
// Prevent subscribers to start a new transaction before this one is really over
225+
if (sub_barrier_)
226+
sub_barrier_->wait();
219227
}
220228

221229
void Engine::sub_close()
@@ -225,12 +233,6 @@ void Engine::sub_close()
225233
if (not sub_closing_) {
226234
// I'm the first to close
227235
sub_closing_ = true;
228-
XBT_DEBUG("Wait for the completion of %u subscribe activities from the previous transaction",
229-
sub_transaction_.size());
230-
sub_transaction_.wait_all();
231-
sub_transaction_.clear();
232-
XBT_DEBUG("last subscribe transaction is over");
233-
sub_transaction_id_++;
234236
}
235237
rm_subscriber(self);
236238

0 commit comments

Comments
 (0)