Skip to content

Commit 3b559e5

Browse files
authored
Merge pull request #674 from ApexAI/iox-#615-block-publisher-when-subscriber-queue-full
Iox #615 block publisher when subscriber queue full
2 parents 366d5a4 + aaab010 commit 3b559e5

File tree

17 files changed

+34
-39
lines changed

17 files changed

+34
-39
lines changed

iceoryx_binding_c/include/iceoryx_binding_c/publisher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ typedef struct
4040
bool offerOnCreate;
4141

4242
/// @brief describes whether a publisher blocks when subscriber queue is full
43-
ENUM iox_SubscriberTooSlowPolicy deliveryQueueFullPolicy;
43+
ENUM iox_SubscriberTooSlowPolicy subscriberTooSlowPolicy;
4444

4545
/// @brief this value will be set exclusively by `iox_pub_options_init` and is not supposed to be modified otherwise
4646
uint64_t initCheck;

iceoryx_binding_c/include/iceoryx_binding_c/subscriber.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ typedef struct
4343
bool subscribeOnCreate;
4444

4545
/// @brief describes whether a publisher blocks when subscriber queue is full
46-
ENUM iox_QueueFullPolicy receiverQueueFullPolicy;
46+
ENUM iox_QueueFullPolicy queueFullPolicy;
4747

4848
/// @brief this value will be set exclusively by iox_sub_options_init and is not supposed to be modified otherwise
4949
uint64_t initCheck;

iceoryx_binding_c/source/c_publisher.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void iox_pub_options_init(iox_pub_options_t* options)
4747
options->historyCapacity = publisherOptions.historyCapacity;
4848
options->nodeName = nullptr;
4949
options->offerOnCreate = publisherOptions.offerOnCreate;
50-
options->deliveryQueueFullPolicy = cpp2c::subscriberTooSlowPolicy(publisherOptions.deliveryQueueFullPolicy);
50+
options->subscriberTooSlowPolicy = cpp2c::subscriberTooSlowPolicy(publisherOptions.subscriberTooSlowPolicy);
5151

5252
options->initCheck = PUBLISHER_OPTIONS_INIT_CHECK_CONSTANT;
5353
}
@@ -90,7 +90,7 @@ iox_pub_t iox_pub_init(iox_pub_storage_t* self,
9090
publisherOptions.nodeName = NodeName_t(TruncateToCapacity, options->nodeName);
9191
}
9292
publisherOptions.offerOnCreate = options->offerOnCreate;
93-
publisherOptions.deliveryQueueFullPolicy = c2cpp::subscriberTooSlowPolicy(options->deliveryQueueFullPolicy);
93+
publisherOptions.subscriberTooSlowPolicy = c2cpp::subscriberTooSlowPolicy(options->subscriberTooSlowPolicy);
9494
}
9595

9696
me->m_portData = PoshRuntime::getInstance().getMiddlewarePublisher(

iceoryx_binding_c/source/c_subscriber.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ void iox_sub_options_init(iox_sub_options_t* options)
5353
options->historyRequest = subscriberOptions.historyRequest;
5454
options->nodeName = nullptr;
5555
options->subscribeOnCreate = subscriberOptions.subscribeOnCreate;
56-
options->receiverQueueFullPolicy = cpp2c::queueFullPolicy(subscriberOptions.receiverQueueFullPolicy);
56+
options->queueFullPolicy = cpp2c::queueFullPolicy(subscriberOptions.queueFullPolicy);
5757

5858
options->initCheck = SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT;
5959
}
@@ -97,7 +97,7 @@ iox_sub_t iox_sub_init(iox_sub_storage_t* self,
9797
subscriberOptions.nodeName = NodeName_t(TruncateToCapacity, options->nodeName);
9898
}
9999
subscriberOptions.subscribeOnCreate = options->subscribeOnCreate;
100-
subscriberOptions.receiverQueueFullPolicy = c2cpp::queueFullPolicy(options->receiverQueueFullPolicy);
100+
subscriberOptions.queueFullPolicy = c2cpp::queueFullPolicy(options->queueFullPolicy);
101101
}
102102

103103
me->m_portData =

iceoryx_binding_c/test/moduletests/test_publisher.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ TEST(iox_pub_options_test, publisherOptionsAreInitializedCorrectly)
353353
sut.historyCapacity = 37;
354354
sut.nodeName = "Dr.Gonzo";
355355
sut.offerOnCreate = false;
356-
sut.deliveryQueueFullPolicy = SubscriberTooSlowPolicy_WAIT_FOR_SUBSCRIBER;
356+
sut.subscriberTooSlowPolicy = SubscriberTooSlowPolicy_WAIT_FOR_SUBSCRIBER;
357357

358358
PublisherOptions options;
359359
// set offerOnCreate to the opposite of the expected default to check if it gets overwritten to default
@@ -363,7 +363,7 @@ TEST(iox_pub_options_test, publisherOptionsAreInitializedCorrectly)
363363
EXPECT_EQ(sut.historyCapacity, options.historyCapacity);
364364
EXPECT_EQ(sut.nodeName, nullptr);
365365
EXPECT_EQ(sut.offerOnCreate, options.offerOnCreate);
366-
EXPECT_EQ(sut.deliveryQueueFullPolicy, cpp2c::subscriberTooSlowPolicy(options.deliveryQueueFullPolicy));
366+
EXPECT_EQ(sut.subscriberTooSlowPolicy, cpp2c::subscriberTooSlowPolicy(options.subscriberTooSlowPolicy));
367367
EXPECT_TRUE(iox_pub_options_is_initialized(&sut));
368368
}
369369

iceoryx_binding_c/test/moduletests/test_subscriber.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ TEST(iox_sub_options_test, subscriberOptionsAreInitializedCorrectly)
408408
sut.historyRequest = 73;
409409
sut.nodeName = "Dr.Gonzo";
410410
sut.subscribeOnCreate = false;
411-
sut.receiverQueueFullPolicy = QueueFullPolicy_BLOCK_PUBLISHER;
411+
sut.queueFullPolicy = QueueFullPolicy_BLOCK_PUBLISHER;
412412

413413
SubscriberOptions options;
414414
// set subscribeOnCreate to the opposite of the expected default to check if it gets overwritten to default
@@ -419,7 +419,7 @@ TEST(iox_sub_options_test, subscriberOptionsAreInitializedCorrectly)
419419
EXPECT_EQ(sut.historyRequest, options.historyRequest);
420420
EXPECT_EQ(sut.nodeName, nullptr);
421421
EXPECT_EQ(sut.subscribeOnCreate, options.subscribeOnCreate);
422-
EXPECT_EQ(sut.receiverQueueFullPolicy, cpp2c::queueFullPolicy(options.receiverQueueFullPolicy));
422+
EXPECT_EQ(sut.queueFullPolicy, cpp2c::queueFullPolicy(options.queueFullPolicy));
423423
EXPECT_TRUE(iox_sub_options_is_initialized(&sut));
424424
}
425425

iceoryx_examples/icedelivery/iox_publisher_with_options.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ int main()
5151
// grouping of publishers and subscribers within a process
5252
publisherOptions.nodeName = "Pub_Node_With_Options";
5353

54+
// we allow the subscribers to block the publisher if they want to ensure that no samples are lost
55+
publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER;
56+
5457
iox::popo::Publisher<RadarObject> publisher({"Radar", "FrontLeft", "Object"}, publisherOptions);
5558

5659
// we have to explicitely offer the publisher for making it visible to subscribers

iceoryx_examples/icedelivery/iox_subscriber_with_options.cpp

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ int main()
5858
// grouping of publishers and subscribers within a process
5959
subscriberOptions.nodeName = "Sub_Node_With_Options";
6060

61+
// we request the publisher to wait for space in the queue if it is full. The publisher will be blocked then
62+
subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PUBLISHER;
63+
6164
iox::popo::Subscriber<RadarObject> subscriber({"Radar", "FrontLeft", "Object"}, subscriberOptions);
6265

6366
// We have to explicitly call subscribe() otherwise the subscriber will not try to connect to publishers
@@ -66,19 +69,8 @@ int main()
6669
// run until interrupted by Ctrl-C
6770
while (!killswitch)
6871
{
69-
if (subscriber.getSubscriptionState() == iox::SubscribeState::SUBSCRIBED)
70-
{
71-
bool hasMoreSamples = true;
72-
// Since we are checking only every second but the publisher is sending every
73-
// 400ms a new sample we will receive here more then one sample.
74-
do
75-
{
76-
subscriber.take()
77-
.and_then([](auto& object) { std::cout << APP_NAME << " got value: " << object->x << std::endl; })
78-
.or_else([&](auto&) { hasMoreSamples = false; });
79-
} while (hasMoreSamples);
80-
}
81-
std::cout << std::endl;
72+
subscriber.take().and_then(
73+
[](auto& object) { std::cout << APP_NAME << " got value: " << object->x << std::endl; });
8274

8375
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
8476
}

iceoryx_posh/include/iceoryx_posh/popo/publisher_options.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ struct PublisherOptions
3838
bool offerOnCreate{true};
3939

4040
/// @brief The option whether the publisher should block when the subscriber queue is full
41-
SubscriberTooSlowPolicy deliveryQueueFullPolicy{SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA};
41+
SubscriberTooSlowPolicy subscriberTooSlowPolicy{SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA};
4242
};
4343

4444
} // namespace popo

iceoryx_posh/include/iceoryx_posh/popo/subscriber_options.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ struct SubscriberOptions
4343
bool subscribeOnCreate{true};
4444

4545
/// @brief The option whether the publisher should block when the subscriber queue is full
46-
QueueFullPolicy receiverQueueFullPolicy{QueueFullPolicy::DISCARD_OLDEST_DATA};
46+
QueueFullPolicy queueFullPolicy{QueueFullPolicy::DISCARD_OLDEST_DATA};
4747
};
4848

4949
} // namespace popo

0 commit comments

Comments
 (0)