-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds resume functionality #33
Open
mvrsss
wants to merge
6
commits into
bloomberg:main
Choose a base branch
from
mvrsss:resume_consumer
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
0d2627f
Adds resume functionality
mvrsss 308e8d9
Adds a state diagram
mvrsss 98b3ed9
Moves state diagram to docs folder
mvrsss 6ba18e2
Update consumer-state-diagram.md
mvrsss 25fe41f
Merge branch 'main' into resume_consumer
mvrsss 89d5ae1
Merge branch 'main' into resume_consumer
willhoy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# Consumer states | ||
|
||
The diagram shows the state changes based on the Broker messages we receive. | ||
|
||
## `CANCELLING_BUT_RESUMING` | ||
|
||
New state `CANCELLING_BUT_RESUMING` was added to support resume after cancellation. | ||
|
||
- calling `resume()` in a `CANCELLING` state sets the state to `CANCELLING_BUT_RESUMING` which will enable consumer restart | ||
|
||
- previously cancelled consumer would be restarted after `resume()` call | ||
|
||
![image](https://github.com/mvrsss/rmqcpp/assets/60746841/54b025ee-3d7a-48de-bbcf-97ba00abf76b) | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
find_package(bsl REQUIRED) | ||
|
||
add_executable(resume_consumer | ||
resume.m.cpp | ||
) | ||
|
||
target_link_libraries(resume_consumer PUBLIC | ||
rmq | ||
bsl | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
#include <rmqa_connectionstring.h> | ||
#include <rmqa_consumer.h> | ||
#include <rmqa_producer.h> | ||
#include <rmqa_rabbitcontext.h> | ||
#include <rmqa_topology.h> | ||
#include <rmqa_vhost.h> | ||
#include <rmqp_producer.h> | ||
#include <rmqt_confirmresponse.h> | ||
#include <rmqt_exchange.h> | ||
#include <rmqt_message.h> | ||
#include <rmqt_result.h> | ||
#include <rmqt_vhostinfo.h> | ||
|
||
#include <ball_severityutil.h> | ||
#include <ball_streamobserver.h> | ||
|
||
#include <bsl_iostream.h> | ||
#include <bsl_memory.h> | ||
#include <bsl_optional.h> | ||
#include <bsl_vector.h> | ||
#include <bsls_atomic.h> | ||
|
||
#include <chrono> | ||
#include <string> | ||
#include <thread> | ||
|
||
using namespace BloombergLP; | ||
using namespace std::chrono_literals; | ||
|
||
namespace { | ||
BALL_LOG_SET_NAMESPACE_CATEGORY("RESUME.CONSUMER") | ||
|
||
bsls::AtomicInt consumedCount; | ||
} // namespace | ||
|
||
void configLog(ball::Severity::Level level) | ||
{ | ||
static ball::LoggerManagerConfiguration configuration; | ||
static bsl::shared_ptr<ball::StreamObserver> loggingObserver = | ||
bsl::make_shared<ball::StreamObserver>(&bsl::cout); | ||
|
||
configuration.setDefaultThresholdLevelsIfValid(level); | ||
|
||
static ball::LoggerManagerScopedGuard guard(configuration); | ||
ball::LoggerManager::singleton().registerObserver(loggingObserver, | ||
"stdout"); | ||
} | ||
|
||
int main(int argc, char** argv) | ||
{ | ||
if (argc < 2) { | ||
std::cerr << "USAGE: " << argv[0] << " <amqp uri>\n"; | ||
return 1; | ||
} | ||
|
||
configLog(ball::Severity::e_INFO); | ||
rmqa::RabbitContext rabbit; | ||
|
||
bsl::optional<rmqt::VHostInfo> vhostInfo = | ||
rmqa::ConnectionString::parse(argv[1]); | ||
|
||
if (!vhostInfo) { | ||
std::cerr << "Failed to parse connection string: " << argv[1] << "\n"; | ||
return 1; | ||
} | ||
|
||
// Returns immediately, setup performed on a different thread | ||
bsl::shared_ptr<rmqa::VHost> vhost = rabbit.createVHostConnection( | ||
"Sample code for a producer", // Connecion Name Visible in management UI | ||
vhostInfo.value()); | ||
|
||
// How many messages can be awaiting confirmation before `send` blocks | ||
const uint16_t maxOutstandingConfirms = 10; | ||
|
||
rmqa::Topology topology; | ||
rmqt::ExchangeHandle exch = topology.addExchange("exchange"); | ||
rmqt::QueueHandle queue = topology.addQueue("queue1"); | ||
|
||
topology.bind(exch, queue, "routing_key"); | ||
|
||
rmqt::Result<rmqa::Producer> producerResult = | ||
vhost->createProducer(topology, exch, maxOutstandingConfirms); | ||
if (!producerResult) { | ||
// A fatal error such as `exch` not being present in `topology` | ||
// A disconnection, or will never permanently fail an operation | ||
std::cerr << "Error creating connection: " << producerResult.error() | ||
<< "\n"; | ||
return 1; | ||
} | ||
|
||
bsl::shared_ptr<rmqa::Producer> producer = producerResult.value(); | ||
|
||
bsl::vector<uint8_t> messageVector = {5, 3, 1}; | ||
|
||
// `send` will block until a confirm is received if the | ||
// `maxOutstandingConfirms` limit is reached | ||
|
||
uint8_t i = 0; | ||
while (i++ < 10) { | ||
messageVector.push_back(i); | ||
rmqt::Message message( | ||
bsl::make_shared<bsl::vector<uint8_t> >(messageVector)); | ||
const rmqp::Producer::SendStatus sendResult = producer->send( | ||
message, | ||
"routing_key", | ||
[](const rmqt::Message& message, | ||
const bsl::string& routingKey, | ||
const rmqt::ConfirmResponse& response) { | ||
// https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed | ||
if (response.status() == rmqt::ConfirmResponse::ACK) { | ||
// Message is now guaranteed to be safe with the broker. | ||
// Now is the time to reply to the request, commit the | ||
// database transaction, or ack the RabbitMQ message which | ||
// triggered this publish | ||
} | ||
else { | ||
// Send error response, rollback transaction, nack message | ||
// and/or raise an alarm for investigation - your message is | ||
// not delivered to all (or perhaps any) of the queues it | ||
// was intended for. | ||
std::cerr << "Message not confirmed: " << message.guid() | ||
<< " for routing key " << routingKey << " " | ||
<< response << "\n"; | ||
} | ||
}); | ||
|
||
if (sendResult != rmqp::Producer::SENDING) { | ||
if (sendResult == rmqp::Producer::DUPLICATE) { | ||
std::cerr | ||
<< "Failed to send message: " << message.guid() | ||
<< " because an identical GUID is already outstanding\n"; | ||
} | ||
else { | ||
std::cerr << "Unknown send failure for message: " | ||
<< message.guid() << "\n"; | ||
} | ||
return 1; | ||
} | ||
} | ||
|
||
rmqt::Result<rmqa::Consumer> consumerResult = vhost->createConsumer( | ||
topology, | ||
queue, | ||
[](rmqp::MessageGuard& messageGuard) { | ||
std::this_thread::sleep_for(2000ms); | ||
messageGuard.ack(); | ||
++consumedCount; | ||
// std::cout << messageGuard.message() << std::endl; | ||
}, // Consumer callback invoked on each message | ||
"consumer-1.0", // Consumer Label (shows in Management UI) | ||
10 // prefetch count | ||
); | ||
|
||
if (!consumerResult) { | ||
return -1; | ||
} | ||
|
||
bsl::shared_ptr<rmqa::Consumer> consumer = consumerResult.value(); | ||
|
||
// Wait for consumer to start consuming | ||
std::this_thread::sleep_for(10000ms); | ||
std::cout << "Number of messages consumed before cancelling: " | ||
<< consumedCount << "\n"; | ||
consumer->cancel(); | ||
|
||
consumer->resume(); | ||
// Wait for consumer to consume messages after resuming | ||
std::this_thread::sleep_for(10000ms); | ||
|
||
while (i++ < 40) { | ||
messageVector.push_back(i); | ||
rmqt::Message message( | ||
bsl::make_shared<bsl::vector<uint8_t> >(messageVector)); | ||
const rmqp::Producer::SendStatus sendResult = producer->send( | ||
message, | ||
"routing_key", | ||
[](const rmqt::Message& message, | ||
const bsl::string& routingKey, | ||
const rmqt::ConfirmResponse& response) { | ||
// https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed | ||
if (response.status() == rmqt::ConfirmResponse::ACK) { | ||
// Message is now guaranteed to be safe with the broker. | ||
// Now is the time to reply to the request, commit the | ||
// database transaction, or ack the RabbitMQ message which | ||
// triggered this publish | ||
} | ||
else { | ||
// Send error response, rollback transaction, nack message | ||
// and/or raise an alarm for investigation - your message is | ||
// not delivered to all (or perhaps any) of the queues it | ||
// was intended for. | ||
std::cerr << "Message not confirmed: " << message.guid() | ||
<< " for routing key " << routingKey << " " | ||
<< response << "\n"; | ||
} | ||
}); | ||
|
||
if (sendResult != rmqp::Producer::SENDING) { | ||
if (sendResult == rmqp::Producer::DUPLICATE) { | ||
std::cerr | ||
<< "Failed to send message: " << message.guid() | ||
<< " because an identical GUID is already outstanding\n"; | ||
} | ||
else { | ||
std::cerr << "Unknown send failure for message: " | ||
<< message.guid() << "\n"; | ||
} | ||
return 1; | ||
} | ||
} | ||
|
||
std::cout << "Total number of messages consumed: " << consumedCount; | ||
|
||
// placeholder to keep channel open while resuming | ||
uint16_t placeholder = 0; | ||
std::cin >> placeholder; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to commit the image into the git repo, but we could make that a separate PR to facilitate merging this now