Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds resume functionality #33

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/consumer-state-diagram.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Consumer states

The diagram shows the state changes based on the Broker messages we receive.

## `CANCELLING_BUT_RESUMING`

New state `CANCELLING_BUT_RESUMING` was added to support resume after cancellation.

- calling `resume()` in a `CANCELLING` state sets the state to `CANCELLING_BUT_RESUMING` which will enable consumer restart

- previously cancelled consumer would be restarted after `resume()` call

![image](https://github.com/mvrsss/rmqcpp/assets/60746841/54b025ee-3d7a-48de-bbcf-97ba00abf76b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
![image](https://github.com/mvrsss/rmqcpp/assets/60746841/54b025ee-3d7a-48de-bbcf-97ba00abf76b)
![image](./consumer-state-diagram.png)

Need to commit the image into the git repo, but we could make that a separate PR to facilitate merging this now

1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ if(NOT (CMAKE_CXX_COMPILER_ID STREQUAL "SunPro") AND NOT (CMAKE_CXX_COMPILER_ID
# The examples don't need to work on these platforms
add_subdirectory(helloworld)
add_subdirectory(topology)
add_subdirectory(resume)
endif()
10 changes: 10 additions & 0 deletions examples/resume/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
find_package(bsl REQUIRED)

add_executable(resume_consumer
resume.m.cpp
)

target_link_libraries(resume_consumer PUBLIC
rmq
bsl
)
217 changes: 217 additions & 0 deletions examples/resume/resume.m.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#include <rmqa_connectionstring.h>
#include <rmqa_consumer.h>
#include <rmqa_producer.h>
#include <rmqa_rabbitcontext.h>
#include <rmqa_topology.h>
#include <rmqa_vhost.h>
#include <rmqp_producer.h>
#include <rmqt_confirmresponse.h>
#include <rmqt_exchange.h>
#include <rmqt_message.h>
#include <rmqt_result.h>
#include <rmqt_vhostinfo.h>

#include <ball_severityutil.h>
#include <ball_streamobserver.h>

#include <bsl_iostream.h>
#include <bsl_memory.h>
#include <bsl_optional.h>
#include <bsl_vector.h>
#include <bsls_atomic.h>

#include <chrono>
#include <string>
#include <thread>

using namespace BloombergLP;
using namespace std::chrono_literals;

namespace {
BALL_LOG_SET_NAMESPACE_CATEGORY("RESUME.CONSUMER")

bsls::AtomicInt consumedCount;
} // namespace

void configLog(ball::Severity::Level level)
{
static ball::LoggerManagerConfiguration configuration;
static bsl::shared_ptr<ball::StreamObserver> loggingObserver =
bsl::make_shared<ball::StreamObserver>(&bsl::cout);

configuration.setDefaultThresholdLevelsIfValid(level);

static ball::LoggerManagerScopedGuard guard(configuration);
ball::LoggerManager::singleton().registerObserver(loggingObserver,
"stdout");
}

int main(int argc, char** argv)
{
if (argc < 2) {
std::cerr << "USAGE: " << argv[0] << " <amqp uri>\n";
return 1;
}

configLog(ball::Severity::e_INFO);
rmqa::RabbitContext rabbit;

bsl::optional<rmqt::VHostInfo> vhostInfo =
rmqa::ConnectionString::parse(argv[1]);

if (!vhostInfo) {
std::cerr << "Failed to parse connection string: " << argv[1] << "\n";
return 1;
}

// Returns immediately, setup performed on a different thread
bsl::shared_ptr<rmqa::VHost> vhost = rabbit.createVHostConnection(
"Sample code for a producer", // Connecion Name Visible in management UI
vhostInfo.value());

// How many messages can be awaiting confirmation before `send` blocks
const uint16_t maxOutstandingConfirms = 10;

rmqa::Topology topology;
rmqt::ExchangeHandle exch = topology.addExchange("exchange");
rmqt::QueueHandle queue = topology.addQueue("queue1");

topology.bind(exch, queue, "routing_key");

rmqt::Result<rmqa::Producer> producerResult =
vhost->createProducer(topology, exch, maxOutstandingConfirms);
if (!producerResult) {
// A fatal error such as `exch` not being present in `topology`
// A disconnection, or will never permanently fail an operation
std::cerr << "Error creating connection: " << producerResult.error()
<< "\n";
return 1;
}

bsl::shared_ptr<rmqa::Producer> producer = producerResult.value();

bsl::vector<uint8_t> messageVector = {5, 3, 1};

// `send` will block until a confirm is received if the
// `maxOutstandingConfirms` limit is reached

uint8_t i = 0;
while (i++ < 10) {
messageVector.push_back(i);
rmqt::Message message(
bsl::make_shared<bsl::vector<uint8_t> >(messageVector));
const rmqp::Producer::SendStatus sendResult = producer->send(
message,
"routing_key",
[](const rmqt::Message& message,
const bsl::string& routingKey,
const rmqt::ConfirmResponse& response) {
// https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed
if (response.status() == rmqt::ConfirmResponse::ACK) {
// Message is now guaranteed to be safe with the broker.
// Now is the time to reply to the request, commit the
// database transaction, or ack the RabbitMQ message which
// triggered this publish
}
else {
// Send error response, rollback transaction, nack message
// and/or raise an alarm for investigation - your message is
// not delivered to all (or perhaps any) of the queues it
// was intended for.
std::cerr << "Message not confirmed: " << message.guid()
<< " for routing key " << routingKey << " "
<< response << "\n";
}
});

if (sendResult != rmqp::Producer::SENDING) {
if (sendResult == rmqp::Producer::DUPLICATE) {
std::cerr
<< "Failed to send message: " << message.guid()
<< " because an identical GUID is already outstanding\n";
}
else {
std::cerr << "Unknown send failure for message: "
<< message.guid() << "\n";
}
return 1;
}
}

rmqt::Result<rmqa::Consumer> consumerResult = vhost->createConsumer(
topology,
queue,
[](rmqp::MessageGuard& messageGuard) {
std::this_thread::sleep_for(2000ms);
messageGuard.ack();
++consumedCount;
// std::cout << messageGuard.message() << std::endl;
}, // Consumer callback invoked on each message
"consumer-1.0", // Consumer Label (shows in Management UI)
10 // prefetch count
);

if (!consumerResult) {
return -1;
}

bsl::shared_ptr<rmqa::Consumer> consumer = consumerResult.value();

// Wait for consumer to start consuming
std::this_thread::sleep_for(10000ms);
std::cout << "Number of messages consumed before cancelling: "
<< consumedCount << "\n";
consumer->cancel();

consumer->resume();
// Wait for consumer to consume messages after resuming
std::this_thread::sleep_for(10000ms);

while (i++ < 40) {
messageVector.push_back(i);
rmqt::Message message(
bsl::make_shared<bsl::vector<uint8_t> >(messageVector));
const rmqp::Producer::SendStatus sendResult = producer->send(
message,
"routing_key",
[](const rmqt::Message& message,
const bsl::string& routingKey,
const rmqt::ConfirmResponse& response) {
// https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed
if (response.status() == rmqt::ConfirmResponse::ACK) {
// Message is now guaranteed to be safe with the broker.
// Now is the time to reply to the request, commit the
// database transaction, or ack the RabbitMQ message which
// triggered this publish
}
else {
// Send error response, rollback transaction, nack message
// and/or raise an alarm for investigation - your message is
// not delivered to all (or perhaps any) of the queues it
// was intended for.
std::cerr << "Message not confirmed: " << message.guid()
<< " for routing key " << routingKey << " "
<< response << "\n";
}
});

if (sendResult != rmqp::Producer::SENDING) {
if (sendResult == rmqp::Producer::DUPLICATE) {
std::cerr
<< "Failed to send message: " << message.guid()
<< " because an identical GUID is already outstanding\n";
}
else {
std::cerr << "Unknown send failure for message: "
<< message.guid() << "\n";
}
return 1;
}
}

std::cout << "Total number of messages consumed: " << consumedCount;

// placeholder to keep channel open while resuming
uint16_t placeholder = 0;
std::cin >> placeholder;
}
34 changes: 29 additions & 5 deletions src/rmq/rmqa/rmqa_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,42 @@

namespace BloombergLP {
namespace rmqa {
namespace {
BALL_LOG_SET_NAMESPACE_CATEGORY("RMQA.CONSUMER")

} // namespace

Consumer::Consumer(bslma::ManagedPtr<rmqp::Consumer>& impl)
: d_impl(impl)
{
}

void Consumer::cancel()
rmqt::Result<> Consumer::cancel(const bsls::TimeInterval& timeout)
{
rmqt::Future<> cancelFuture = d_impl->cancel();
rmqt::Result<> cancelResult = timeout == bsls::TimeInterval(0)
? cancelFuture.blockResult()
: cancelFuture.waitResult(timeout);

if (timeout > bsls::TimeInterval(0)) {
BALL_LOG_INFO << "Cancel Called, result after " << timeout << ": "
<< cancelResult;
}
return cancelResult;
}

rmqt::Result<> Consumer::resume(const bsls::TimeInterval& timeout)
{
rmqt::Result<> cancelResult =
d_impl->cancel().waitResult(bsls::TimeInterval(0, 1000));
BALL_LOG_SET_CATEGORY("Consumer::cancel");
BALL_LOG_INFO << "Cancel Called, result after 1ms: " << cancelResult;
rmqt::Future<> resumeFuture = d_impl->resume();
rmqt::Result<> resumeResult = timeout == bsls::TimeInterval(0)
? resumeFuture.blockResult()
: resumeFuture.waitResult(timeout);

if (timeout > bsls::TimeInterval(0)) {
BALL_LOG_INFO << "Resume Called, result after " << timeout << ": "
<< resumeResult;
}
return resumeResult;
}

rmqt::Result<> Consumer::cancelAndDrain(
Expand Down
9 changes: 8 additions & 1 deletion src/rmq/rmqa/rmqa_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,14 @@ class Consumer {
/// \brief Tells the broker to stop delivering messages to this consumer.
/// it's still possible to nack/ack messages from callbacks after cancel is
/// called
void cancel();
rmqt::Result<>
cancel(const bsls::TimeInterval& timeout = bsls::TimeInterval(0));

/// \brief Tells the broker to restart consuming after cancelling the
/// consumer. Cancelled consumer will not be destroyed, but restarted with
/// the same consumer tag.
rmqt::Result<>
resume(const bsls::TimeInterval& timeout = bsls::TimeInterval(0));

/// \brief Tells the broker to stop delivering messages to this consumer.
/// \param timeout How long to wait for all delivered (unacked) messages
Expand Down
6 changes: 6 additions & 0 deletions src/rmq/rmqa/rmqa_consumerimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ rmqt::Future<> ConsumerImpl::drain()
bdlf::BindUtil::bind(&rmqamqp::ReceiveChannel::drain, d_channel)));
}

rmqt::Future<> ConsumerImpl::resume()
{
return rmqt::FutureUtil::flatten<void>(d_eventLoop.postF<rmqt::Future<> >(
bdlf::BindUtil::bind(&rmqamqp::ReceiveChannel::resume, d_channel)));
}

rmqt::Result<> ConsumerImpl::cancelAndDrain(const bsls::TimeInterval& timeout)
{
bsl::function<rmqt::Future<>()> fn =
Expand Down
2 changes: 2 additions & 0 deletions src/rmq/rmqa/rmqa_consumerimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class ConsumerImpl : public rmqp::Consumer,
/// messages
rmqt::Future<> drain() BSLS_KEYWORD_OVERRIDE;

rmqt::Future<> resume() BSLS_KEYWORD_OVERRIDE;

rmqt::Result<>
cancelAndDrain(const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE;

Expand Down
Loading