diff --git a/README.md b/README.md index fa984eb..b8ae4ff 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ These instructions assume you are using the docker compose configuration and the (This closely replicates what an INTERSECT message looks like, though the only thing you really need to check is that your conf.yaml's `topic_prefix` value starts with the value for `headers.source`) -7) Click "publish_message" +7) Click "publish_message". At this point the message should show up in the logs for both `http-2-broker` and `broker-2-http` . 8) On localhost:15673 `Queues and streams` section, blow up `Get messages`, set Ack mode to `Automatic ack`, click on `Get messages`, you should see your payload from step 6. Congratulations, you have successfully simulated a publisher and a subscriber being able to talk to each other across 2 separate message brokers. diff --git a/broker-2-http/src/amqp_consumer.rs b/broker-2-http/src/amqp_consumer.rs index 37ce9f5..b412ee3 100644 --- a/broker-2-http/src/amqp_consumer.rs +++ b/broker-2-http/src/amqp_consumer.rs @@ -1,103 +1,185 @@ use amqprs::{ - channel::{BasicAckArguments, Channel}, - consumer::AsyncConsumer, - BasicProperties, Deliver, + channel::{ + BasicAckArguments, BasicCancelArguments, BasicConsumeArguments, Channel, ConsumerMessage, + QueueBindArguments, QueueDeclareArguments, + }, + connection::Connection, }; use std::sync::Arc; - -use axum::async_trait; +use uuid::Uuid; use crate::broadcaster::Broadcaster; -use intersect_ingress_proxy_common::intersect_messaging::{ - make_eventsource_data, should_message_passthrough, +use intersect_ingress_proxy_common::intersect_messaging::INTERSECT_MESSAGE_EXCHANGE; +use intersect_ingress_proxy_common::protocols::amqp::{get_channel, get_connection, make_exchange}; +use intersect_ingress_proxy_common::{ + configuration::BrokerSettings, + intersect_messaging::{make_eventsource_data, should_message_passthrough}, + signals::wait_for_os_signal, }; -/// Consumer for messages. If a message is determined to be from the system, -/// the message is broadcast using the broadcaster to all appropriate channels. -pub struct AmqpConsumer { - auto_ack: bool, - topic_prefix: String, +pub async fn broker_consumer_loop( + config_broker: BrokerSettings, + config_topic: String, broadcaster: Arc, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + broker_consumer_loop_inner(config_broker, config_topic, broadcaster).await + }) } -impl AmqpConsumer { - pub fn new(auto_ack: bool, topic_prefix: String, broadcaster: Arc) -> Self { - Self { - auto_ack, - topic_prefix, - broadcaster, +async fn broker_consumer_loop_inner( + config_broker: BrokerSettings, + config_topic: String, + broadcaster: Arc, +) { + let mut connected_once = false; + + 'connection_loop: loop { + let connection = get_connection(&config_broker, if connected_once { 0 } else { 10 }).await; + let channel = get_channel(&connection).await; + connected_once = true; + + make_exchange(&channel) + .await + .expect("Could not declare exchange on channel"); + + // we'll use a persistent queue named "broker-2-http", as there should only be one broker-2-http deployment per System + // TODO - note that we should probably name queues larger than 127 characters with a hashed key + let (queue_name, _, _) = channel + .queue_declare(QueueDeclareArguments::durable_client_named("broker-2-http")) + .await + .expect("Couldn't declare queue") + .expect("didn't get correct args back from queue declaration"); + + // listen for every single message on the exchange, we must do this due to the way userspace messages work + channel + .queue_bind(QueueBindArguments::new( + &queue_name, + INTERSECT_MESSAGE_EXCHANGE, + "#", + )) + .await + .expect("Couldn't bind to queue"); + + // Do NOT automatically acknowledge messages, we may not be able to forward them. + let args = BasicConsumeArguments::new(&queue_name, &Uuid::new_v4().to_string()) + .manual_ack(true) // only ack messages we should actually publish, we will nack the others + .finish(); + + let (consumer_tag, mut messages_rx) = channel.basic_consume_rx(args).await.unwrap(); + loop { + tokio::select! { + // OS kill signal + _ = wait_for_os_signal() => { + // attempt cleanup before terminating + tracing::warn!("Received terminate signal from OS, attempting to gracefully disconnect from AMQP broker..."); + cleanup(consumer_tag, channel, connection).await; + + break 'connection_loop; + }, + consumer_result = messages_rx.recv() => { + match consumer_result { + Some(msg) => consume_message(msg, &channel, &config_topic, broadcaster.clone()).await, + None => { + tracing::warn!("Messages channel was suddenly closed, will try to reconnect"); + break; + }, + } + } + } } + + // if we reach this, the channel has been closed from the messages_rx object (most likely from a broker disconnect), so we will clean up and then attempt reconnection + cleanup(consumer_tag, channel, connection).await; } } -#[async_trait] -impl AsyncConsumer for AmqpConsumer { - async fn consume( - &mut self, - channel: &Channel, - deliver: Deliver, - _basic_properties: BasicProperties, - content: Vec, - ) { - // This is the major difference between our implementations and what the SDK does - we don't necessarily want to ACK (but by default we will) - // we will always manually ACK unless nobody was available to listen to our message, in which case we should NACK and requeue. - let mut should_ack = true; - if deliver.redelivered() { - tracing::warn!("message was redelivered"); - } - tracing::debug!("consume delivery {}", deliver); - match String::from_utf8(content) { - Ok(utf8_data) => { - tracing::debug!("raw message data: {}", &utf8_data); - match should_message_passthrough(&utf8_data, &self.topic_prefix) { - Err(e) => { - tracing::error!(error = ?e, "message is valid UTF-8 but not INTERSECT JSON"); - } - Ok(false) => { - tracing::warn!( - "message source is not from this system, will not broadcast it" - ); - } - Ok(true) => { - let topic = deliver.routing_key(); - let event = make_eventsource_data(topic, &utf8_data); - tracing::debug!("consume delivery {} , data: {}", deliver, event,); - // TODO handle this better later, see broadcast() documentation for details. - if self.broadcaster.broadcast(&event) == 0 { - tracing::warn!("Broadcaster did not broadcast to anybody"); - should_ack = false; - } +/// domain logic for handling a message from the broker +async fn consume_message( + msg: ConsumerMessage, + channel: &Channel, + config_topic: &str, + broadcaster: Arc, +) { + let deliver = msg.deliver.unwrap(); + let content = msg.content.unwrap(); + + // This is the major difference between our implementations and what the SDK does - we don't necessarily want to ACK (but by default we will) + // we will always manually ACK unless nobody was available to listen to our message, in which case we should NACK and requeue. + let mut should_ack = true; + if deliver.redelivered() { + tracing::warn!("message was redelivered"); + } + tracing::debug!("consume delivery {}", deliver); + match String::from_utf8(content) { + Ok(utf8_data) => { + tracing::debug!("raw message data: {}", &utf8_data); + match should_message_passthrough(&utf8_data, config_topic) { + Err(e) => { + tracing::error!(error = ?e, "message is valid UTF-8 but not INTERSECT JSON"); + } + Ok(false) => { + tracing::warn!("message source is not from this system, will not broadcast it"); + } + Ok(true) => { + let topic = deliver.routing_key(); + let event = make_eventsource_data(topic, &utf8_data); + tracing::debug!("consume delivery {} , data: {}", deliver, event,); + // TODO handle this better later, see broadcast() documentation for details. + if broadcaster.broadcast(&event) == 0 { + tracing::warn!("Broadcaster did not broadcast to anybody"); + should_ack = false; } } } - Err(e) => { - tracing::error!(error = ?e, "message data is not UTF-8, cannot be forwarded over SSE"); - } } + Err(e) => { + tracing::error!(error = ?e, "message data is not UTF-8, cannot be forwarded over SSE"); + } + } - if !self.auto_ack { - if should_ack { - tracing::debug!("ack to delivery {}", deliver); - let args = BasicAckArguments::new(deliver.delivery_tag(), false); - match channel.basic_ack(args).await { - Ok(_) => {} - Err(e) => tracing::error!(error = ?e, "manual ack did not work"), - }; - } else { - // We don't acknowledge or reject the message, so we immediately get the message back. - tracing::warn!( - "Some clients probably did not get delivery {}, not acknowledging the message", - deliver, - ); - // TODO - if we're able to determine SPECIFIC clients who did/did not get it, we may want to explicitly reject the message. - // match channel - // .basic_reject(BasicRejectArguments::new(deliver.delivery_tag(), true)) - // .await - // { - // Ok(_) => {} - // Err(e) => tracing::error!(error = ?e, "manual nack did not work"), - // }; - } + if should_ack { + tracing::debug!("ack to delivery {}", deliver); + let args = BasicAckArguments::new(deliver.delivery_tag(), false); + match channel.basic_ack(args).await { + Ok(_) => {} + Err(e) => tracing::error!(error = ?e, "manual ack did not work"), + }; + } else { + // We don't acknowledge or reject the message, so we immediately get the message back. + tracing::warn!( + "Some clients probably did not get delivery {}, not acknowledging the message", + deliver, + ); + // TODO - if we're able to determine SPECIFIC clients who did/did not get it, we may want to explicitly reject the message. + // match channel + // .basic_reject(BasicRejectArguments::new(deliver.delivery_tag(), true)) + // .await + // { + // Ok(_) => {} + // Err(e) => tracing::error!(error = ?e, "manual nack did not work"), + // }; + } +} + +/// call this if we were instructed to shut down or our channel suddenly disconnected. +async fn cleanup(consumer_tag: String, channel: Channel, connection: Connection) { + if let Err(e) = channel + .basic_cancel(BasicCancelArguments::new(&consumer_tag)) + .await + { + tracing::error!(error = ?e, "could not send cancel message"); + }; + match channel.close().await { + Ok(_) => tracing::debug!("closed channel"), + Err(e) => { + tracing::error!(error = ?e, "Could not close channel") + } + } + match connection.close().await { + Ok(_) => tracing::debug!("closeed connection"), + Err(e) => { + tracing::error!(error = ?e, "Could not close connection") } } } diff --git a/broker-2-http/src/amqp_manager.rs b/broker-2-http/src/amqp_manager.rs deleted file mode 100644 index 411d7e5..0000000 --- a/broker-2-http/src/amqp_manager.rs +++ /dev/null @@ -1,81 +0,0 @@ -use amqprs::{ - channel::{BasicConsumeArguments, Channel, QueueBindArguments, QueueDeclareArguments}, - connection::Connection, -}; -use std::sync::Arc; -use uuid::Uuid; - -use crate::{amqp_consumer::AmqpConsumer, broadcaster::Broadcaster, configuration::Settings}; -use intersect_ingress_proxy_common::intersect_messaging::INTERSECT_MESSAGE_EXCHANGE; -use intersect_ingress_proxy_common::protocols::amqp::{get_channel, get_connection, make_exchange}; - -/// AmqpManager is meant to be a long-living object which contains connection/channel data of a consumer. -#[derive(Clone)] -pub struct AmqpManager { - connection: Connection, - channel: Channel, -} - -impl AmqpManager { - /// Create a new AmqpManager. We need to provide a Broadcaster because this object manages the AmqpConsumer on its own. - pub async fn new(configuration: &Settings, broadcaster: Arc) -> Self { - let connection = get_connection(&configuration.broker, 10).await; - let channel = get_channel(&connection).await; - - // TODO - we probably don't need to declare this exchange, since we are always subscribing to messages on this channel. - make_exchange(&channel) - .await - .expect("Could not declare exchange on channel"); - - // we'll use a persistent queue named "broker-2-http", as there should only be one broker-2-http deployment per System - // TODO - note that we should probably name queues larger than 127 characters with a hashed key - let (queue_name, _, _) = channel - .queue_declare(QueueDeclareArguments::durable_client_named("broker-2-http")) - .await - .expect("Couldn't declare queue") - .expect("didn't get correct args back from queue declaration"); - - // listen for every single message on the exchange, we must do this due to the way userspace messages work - channel - .queue_bind(QueueBindArguments::new( - &queue_name, - INTERSECT_MESSAGE_EXCHANGE, - "#", - )) - .await - .expect("Couldn't bind to queue"); - - // Do NOT automatically acknowledge messages, we may not be able to forward them. - let args = BasicConsumeArguments::new(&queue_name, &Uuid::new_v4().to_string()) - .manual_ack(true) // only ack messages we should actually publish, we will nack the others - .finish(); - - channel - .basic_consume( - AmqpConsumer::new(args.no_ack, configuration.topic_prefix.clone(), broadcaster), - args, - ) - .await - .unwrap(); - Self { - connection, - channel, - } - } - - /// Cleanup for the AmqpManager. The manager cannot be used after calling this. - pub async fn destruct(self) { - match self.channel.close().await { - Ok(_) => tracing::debug!("closed channel"), - Err(e) => { - tracing::error!(error = ?e, "Could not close channel") - } - } - match self.connection.close().await { - Ok(_) => tracing::debug!("closeed connection"), - Err(e) => { - tracing::error!(error = ?e, "Could not close connection") - } - } - } -} diff --git a/broker-2-http/src/broadcaster.rs b/broker-2-http/src/broadcaster.rs index cab0b05..88973b8 100644 --- a/broker-2-http/src/broadcaster.rs +++ b/broker-2-http/src/broadcaster.rs @@ -37,6 +37,7 @@ impl Broadcaster { /// 1) getting SSE client information so we know who DID get it, then deduce who didn't from the config list /// 2) for each client who DIDN'T, NACK the message on a special exchange (dedicated to these clients). /// 3) Somehow transfer these messages over to the other message broker, make the messages their responsibility. + /// /// Once the messages are on the other message broker, broker-2-http and http-2-broker don't need to care, handling them will be the SDK's job. pub fn broadcast(&self, event: &str) -> usize { self.fanout.send(Event::default().data(event)).unwrap_or(0) diff --git a/broker-2-http/src/lib.rs b/broker-2-http/src/lib.rs index 52c3366..cff0e3d 100644 --- a/broker-2-http/src/lib.rs +++ b/broker-2-http/src/lib.rs @@ -1,6 +1,5 @@ pub mod amqp_consumer; -pub mod amqp_manager; pub mod broadcaster; pub mod configuration; pub mod routes; -pub mod startup; +pub mod webapp; diff --git a/broker-2-http/src/main.rs b/broker-2-http/src/main.rs index e2035ee..6d1da67 100644 --- a/broker-2-http/src/main.rs +++ b/broker-2-http/src/main.rs @@ -1,4 +1,7 @@ -use broker_2_http::{configuration::Settings, startup::Application}; +use broker_2_http::{ + amqp_consumer::broker_consumer_loop, broadcaster::Broadcaster, configuration::Settings, + webapp::WebApplication, +}; use intersect_ingress_proxy_common::configuration::get_configuration; use intersect_ingress_proxy_common::telemetry::{ @@ -22,9 +25,19 @@ async fn main() -> anyhow::Result<()> { init_subscriber(subscriber); } - let (application, amqp_manager) = Application::build(configuration).await?; + let broadcaster = Broadcaster::new(); + + let application = WebApplication::build(&configuration, broadcaster.clone()).await?; + + let broker_join_handle = broker_consumer_loop( + configuration.broker.clone(), + configuration.topic_prefix.clone(), + broadcaster.clone(), + ) + .await; application.run_until_stopped().await?; tracing::warn!("Application shutting down, please wait for cleanups..."); - amqp_manager.destruct().await; + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + broker_join_handle.abort(); Ok(()) } diff --git a/broker-2-http/src/routes/subscribe.rs b/broker-2-http/src/routes/subscribe.rs index 17eff40..258a389 100644 --- a/broker-2-http/src/routes/subscribe.rs +++ b/broker-2-http/src/routes/subscribe.rs @@ -11,11 +11,11 @@ use secrecy::ExposeSecret; use std::convert::Infallible; use std::sync::Arc; -use crate::startup::ApplicationState; +use crate::webapp::WebApplicationState; use intersect_ingress_proxy_common::signals::wait_for_os_signal; fn sse_response( - app_state: Arc, + app_state: Arc, ) -> Sse>> { let mut rx = app_state.broadcaster.add_client(); @@ -48,7 +48,7 @@ fn sse_response( /// https://github.com/tokio-rs/axum/discussions/1670 /// https://github.com/tokio-rs/axum/discussions/2264 pub async fn sse_handler( - State(app_state): State>, + State(app_state): State>, TypedHeader(authorization): TypedHeader>, ) -> impl IntoResponse { if authorization.username() != app_state.username diff --git a/broker-2-http/src/startup.rs b/broker-2-http/src/webapp.rs similarity index 77% rename from broker-2-http/src/startup.rs rename to broker-2-http/src/webapp.rs index d15c0c6..3cb0f4f 100644 --- a/broker-2-http/src/startup.rs +++ b/broker-2-http/src/webapp.rs @@ -14,7 +14,6 @@ use tower_http::{ use tracing::Level; use crate::{ - amqp_manager::AmqpManager, broadcaster::Broadcaster, configuration::Settings, routes::{health_check::health_check, not_found::handler_404, subscribe::sse_handler}, @@ -23,7 +22,7 @@ use crate::{ use intersect_ingress_proxy_common::signals::wait_for_os_signal; /// This is state that can be accessed by any endpoint on the server. -pub struct ApplicationState { +pub struct WebApplicationState { /// this broadcaster gets messages published to it from one source and can publish many messages from it pub broadcaster: Arc, /// basic auth username @@ -32,14 +31,17 @@ pub struct ApplicationState { pub password: Secret, } -type AppServer = Serve; -pub struct Application { +type WebAppServer = Serve; +pub struct WebApplication { pub port: u16, - pub server: AppServer, + pub server: WebAppServer, } -impl Application { - pub async fn build(configuration: Settings) -> Result<(Self, AmqpManager), anyhow::Error> { +impl WebApplication { + pub async fn build( + configuration: &Settings, + broadcaster: Arc, + ) -> Result { let address = format!( "{}:{}", if configuration.production { @@ -51,11 +53,11 @@ impl Application { ); let listener = TcpListener::bind(address).await?; let port = listener.local_addr().unwrap().port(); - let (server, amqp_manager) = run(listener, &configuration).await?; + let server = run(listener, configuration, broadcaster).await?; tracing::info!("Web server is running on port {}", port); - Ok((Self { port, server }, amqp_manager)) + Ok(Self { port, server }) } pub fn port(&self) -> u16 { @@ -73,7 +75,8 @@ impl Application { async fn run( listener: TcpListener, configuration: &Settings, -) -> Result<(AppServer, AmqpManager), anyhow::Error> { + broadcaster: Arc, +) -> Result { let middleware = ServiceBuilder::new() .set_x_request_id(MakeRequestUuid) .layer( @@ -87,10 +90,7 @@ async fn run( ) .propagate_x_request_id(); - let broadcaster = Broadcaster::new(); - // TODO this is a slightly awkward way to persist the AMQPManager - let amqp_manager = AmqpManager::new(configuration, broadcaster.clone()).await; - let app_state = Arc::new(ApplicationState { + let app_state = Arc::new(WebApplicationState { broadcaster, username: configuration.username.clone(), password: configuration.password.clone(), @@ -98,11 +98,12 @@ async fn run( let app = Router::new() .route("/healthcheck", get(health_check)) - .route("/", get(sse_handler)) + .route("/subscribe", get(sse_handler)) + //.route("/publish", post(publish)) .layer(middleware) .with_state(app_state) .fallback(handler_404); let server = axum::serve(listener, app); - Ok((server, amqp_manager)) + Ok(server) } diff --git a/http-2-broker/conf.yaml b/http-2-broker/conf.yaml index 2aa18a9..8d26ebf 100644 --- a/http-2-broker/conf.yaml +++ b/http-2-broker/conf.yaml @@ -1,6 +1,6 @@ # local development config file for the SSE client other_proxy: - url: "http://localhost:8080" + url: "http://localhost:8080/subscribe" username: dummy_username password: dummy_password broker: diff --git a/http-2-broker/src/main.rs b/http-2-broker/src/main.rs index e21c106..ea26705 100644 --- a/http-2-broker/src/main.rs +++ b/http-2-broker/src/main.rs @@ -4,6 +4,7 @@ use std::time::Duration; use amqprs::{channel::BasicPublishArguments, connection::Connection, BasicProperties}; use futures::StreamExt; use reqwest_eventsource::{Event, EventSource}; +use tokio::sync::Mutex; use http_2_broker::configuration::Settings; use intersect_ingress_proxy_common::configuration::get_configuration; @@ -21,10 +22,10 @@ use secrecy::ExposeSecret; /// Data we need to share across multiple closures. struct BrokerData { - pub connection: Connection, + pub connection: Mutex, } -async fn send_message(message: String, broker_data: Arc) { +async fn send_message(configuration: &Settings, message: String, broker_data: Arc) { let es_data_result = extract_eventsource_data(&message); if es_data_result.is_err() { return; @@ -38,10 +39,15 @@ async fn send_message(message: String, broker_data: Arc) { return; } + let mut connection = broker_data.connection.lock().await; + if !connection.is_open() { + *connection = get_connection(&configuration.broker, 0).await; + } + // TODO - we'd ideally like to potentially reuse the channel instead of closing it every time // see https://github.com/rdoetjes/rabbit_systeminfo/blob/master/systeminfo/src/main.rs#L84 as an example // we NEED to explicitly close the channel, or else problems on the broker may develop - let channel = get_channel(&broker_data.connection).await; + let channel = get_channel(&connection).await; let args = BasicPublishArguments::new(INTERSECT_MESSAGE_EXCHANGE, &topic); // NOTE: the publish() function takes ownership of the string, if you don't care about logging then don't clone @@ -95,7 +101,7 @@ async fn event_source_loop(configuration: &Settings, broker_data: Arc { - send_message(message.data, broker_data.clone()).await; + send_message(configuration, message.data, broker_data.clone()).await; }, Err(err) => { // will happen if we can't connect to the endpoint OR if the endpoint drops us @@ -155,7 +161,9 @@ pub async fn main() { } } - let broker_data = Arc::new(BrokerData { connection }); + let broker_data = Arc::new(BrokerData { + connection: Mutex::new(connection), + }); let rc = event_source_loop(&configuration, broker_data.clone()).await;