Skip to content

Commit

Permalink
Merge branch 'LeonHartley:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
RedKinda committed May 2, 2024
2 parents 95db328 + a2d3f19 commit 72eb166
Show file tree
Hide file tree
Showing 51 changed files with 3,186 additions and 469 deletions.
60 changes: 18 additions & 42 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion coerce/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "coerce"
description = "Async actor runtime and distributed systems framework"
license = "Apache-2.0"
version = "0.8.11"
version = "0.8.12"
authors = ["Leon Hartley <[email protected]>"]
edition = "2021"
readme = "README.md"
Expand All @@ -20,6 +20,7 @@ full = [
"actor-tracing",
"actor-tracing-info",
"client-auth-jwt",
"singleton",
]

remote = [
Expand Down Expand Up @@ -70,6 +71,8 @@ api = ["remote", "dep:axum", "dep:utoipa", "dep:utoipa-swagger-ui"]

client-auth-jwt = ["dep:jwt", "dep:hmac", "dep:sha2"]

singleton = []

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
Expand Down
6 changes: 3 additions & 3 deletions coerce/src/actor/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ impl ActorSystem {
Ok(_) => Ok(actor_ref),
Err(_e) => {
error!(
"actor not started, actor_id={}, type={}",
&id,
A::type_name()
actor_id = id.as_ref(),
actor_type = A::type_name(),
"actor not started",
);
Err(ActorRefErr::ActorStartFailed)
}
Expand Down
28 changes: 21 additions & 7 deletions coerce/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,27 +385,41 @@ fn on_context_dropped(
ActorStatus::Started => {
if let Some(true) = system_terminated {
debug!(
"actor (id={}, type={}) has stopped due to system shutdown",
actor_id, actor_type
actor_id = actor_id.as_ref(),
actor_type = actor_type,
"actor stopped due to system shutdown",
);
} else {
debug!("actor (id={}) has stopped unexpectedly", actor.0.actor_id());
debug!(
actor_id = actor.0.actor_id().as_ref(),
actor_type = actor_type,
"actor stopped unexpectedly"
);
}
}

ActorStatus::Stopping => {
if let Some(true) = system_terminated {
trace!("actor (id={}) has stopped due to system shutdown", actor_id,);
trace!(
actor_id = actor_id.as_ref(),
actor_type = actor_type,
"actor stopped due to system shutdown"
);
} else {
debug!(
"actor (id={}) was stopping but did not complete the stop procedure",
actor_id,
actor_id = actor_id.as_ref(),
actor_type = actor_type,
"actor was stopping but did not complete the stop procedure",
);
}
}

ActorStatus::Stopped => {
debug!("actor (id={}) stopped, context dropped", actor_id);
debug!(
actor_id = actor_id.as_ref(),
actor_type = actor_type,
"actor stopped, context dropped"
);
}
}

Expand Down
20 changes: 14 additions & 6 deletions coerce/src/actor/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl ActorLoop {
.new_context(system.clone(), Starting, actor_ref.clone().into())
.with_parent(parent_ref);

trace!("[{}] starting", ctx.full_path());
trace!(actor = ctx.full_path().as_ref(), "actor starting");

actor.started(&mut ctx).await;
ActorMetrics::incr_actor_created(A::type_name());
Expand All @@ -72,7 +72,7 @@ impl ActorLoop {

ctx.set_status(Started);

trace!("[{}] ready", ctx.full_path());
trace!(actor = ctx.full_path().as_ref(), "actor ready");

if let Some(on_start) = on_start.take() {
let _ = on_start.send(());
Expand Down Expand Up @@ -102,7 +102,11 @@ impl ActorLoop {
message_type = msg.name(),
);

trace!("[{}] received {}", ctx.full_path(), msg.name(),);
trace!(
actor = ctx.full_path().as_ref(),
msg_type = msg.name(),
"actor message received"
);

let handle_fut = msg.handle(&mut actor, &mut ctx);

Expand All @@ -111,15 +115,19 @@ impl ActorLoop {

handle_fut.await;

trace!("[{}] processed {}", ctx.full_path(), msg.name());
trace!(
actor = ctx.full_path().as_ref(),
msg_type = msg.name(),
"actor message processed"
);
}

if ctx.get_status() == &Stopping {
break;
}
}

trace!("[{}] stopping", ctx.full_path());
trace!(actor = ctx.full_path().as_ref(), "actor stopping");

ctx.set_status(Stopping);

Expand All @@ -141,7 +149,7 @@ async fn actor_stopped<A: Actor>(
if actor_type.is_tracked() {
if let Some(system) = system.take() {
if !system.is_terminated() {
trace!("de-registering actor {}", &actor_id);
trace!(actor_id = actor_id.as_ref(), "de-registering actor");

system
.scheduler()
Expand Down
27 changes: 27 additions & 0 deletions coerce/src/actor/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,33 @@ pub enum MessageWrapErr {
SerializationErr,
}

pub trait ToBytes {
fn to_bytes(self) -> Result<Vec<u8>, MessageWrapErr>;
}

pub trait FromBytes
where
Self: Sized,
{
fn from_bytes(buf: Vec<u8>) -> Result<Self, MessageUnwrapErr>;
}

impl<T: protobuf::Message> ToBytes for T {
fn to_bytes(self) -> Result<Vec<u8>, MessageWrapErr> {
self.write_to_bytes()
.map_err(|_| MessageWrapErr::SerializationErr)
}
}

impl<T: protobuf::Message> FromBytes for T
where
Self: Sized,
{
fn from_bytes(buf: Vec<u8>) -> Result<Self, MessageUnwrapErr> {
Self::parse_from_bytes(&buf).map_err(|_| MessageUnwrapErr::DeserializationErr)
}
}

impl Display for MessageWrapErr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match &self {
Expand Down
Loading

0 comments on commit 72eb166

Please sign in to comment.