Skip to content

Commit

Permalink
what
Browse files Browse the repository at this point in the history
  • Loading branch information
nothendev committed Oct 8, 2023
1 parent 9ba4d4e commit 6a66458
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
50 changes: 41 additions & 9 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ pub use indexmap;
pub use miette;
pub use nu_ansi_term as ansi;
pub use tracing;
/// A macro similar to `vec![$elem; $size]` which returns a boxed array.
///
/// ```rust
/// let _: Box<[u8; 1024]> = box_array![0; 1024];
/// ```
#[macro_export]
macro_rules! box_array {
($val:expr ; $len:expr) => {{
// Use a generic function so that the pointer cast remains type-safe
fn vec_to_boxed_array<T>(vec: Vec<T>) -> Box<[T; $len]> {
let boxed_slice = vec.into_boxed_slice();

let ptr = ::std::boxed::Box::into_raw(boxed_slice) as *mut [T; $len];

unsafe { Box::from_raw(ptr) }
}

vec_to_boxed_array(vec![$val; $len])
}};
}

pub async fn rwlock_set<T: 'static>(rwlock: &RwLock<T>, value: T) {
rwlock.set(value).await
Expand Down Expand Up @@ -131,7 +151,7 @@ use tokio::{
};

use crate::model::{
packets::{Packet, PacketContext, PluginMessage, SerializedPacket},
packets::{Packet, PacketContext, PluginMessage, SerializedPacket, SerializedPacketCompressed},
State,
};

Expand Down Expand Up @@ -195,24 +215,30 @@ impl PlayerNet {
let compressing__ = compressing.clone();
let recv_task = tokio::spawn(async move {
async {
let mut buf = BytesMut::new();
let mut buf = box_array![0u8; model::MAX_PACKET_DATA];

loop {
let read_bytes = read.read_buf(&mut buf).await?;
let read_bytes = read.read_exact(buf.as_mut()).await?;
if read_bytes == 0 {
return Ok::<(), crate::error::Error>(());
}
let spack = if compressing__.get_copy().await {
let compres = compressing__.get_copy().await;
let spack = if let Some(cmp) = compression.filter(|_| compres) {
trace!("[recv]compressing");
SerializedPacket::deserialize_compressing(compression)
let sp: SerializedPacket = SerializedPacketCompressed::deserialize
.parse(buf.as_ref())?
.into();
if sp.length < cmp {
warn!(?sp, ?cmp, "packet length was less than threshold");
}
sp
} else {
trace!("[recv]not compressing");
SerializedPacket::deserialize.parse(buf.as_ref())?
};
trace!(?buf, ?spack, "recving packet");
s_recv.send_async(spack).await?;
buf.clear();
buf.fill(0);
}
}
.await?;
Expand All @@ -237,9 +263,6 @@ impl PlayerNet {
error!(%peer_addr, ?error, "Disconnected (due to error)");
cancellator.cancel();
}
_ = cancellator.cancelled() => {
info!("cancellator'd");
}
}
});

Expand All @@ -259,6 +282,11 @@ impl PlayerNet {
pub async fn recv_packet<T: Packet + Deserialize<Context = PacketContext> + Debug>(
&self,
) -> Result<T> {
let tnov = std::any::type_name::<T>();
if self.recv.is_disconnected() {
trace!(packet=tnov, addr=%self.peer_addr, "receiving packet failed - disconnected");
return Err(crate::error::Error::ConnectionEnded);
}
let packet = self.recv.recv_async().await?;
let state = *self.state.read().await;
trace!(%self.peer_addr, ?packet, ?state, "Received packet");
Expand All @@ -269,6 +297,10 @@ impl PlayerNet {

/// Writes a packet.
pub fn send_packet<T: Packet + Serialize + Debug>(&self, packet: T) -> Result<()> {
if self.send.is_disconnected() {
trace!(?packet, addr=%self.peer_addr, "sending packet failed - disconnected");
return Err(crate::error::Error::ConnectionEnded);
}
let spack = SerializedPacket::new_ref(&packet)?;
trace!(?packet, addr=%self.peer_addr, ?spack, "Sending packet");
Ok(self.send.send(spack)?)
Expand Down
10 changes: 6 additions & 4 deletions protocol/src/model/packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,15 @@ impl Deserialize for SerializedPacketCompressed {
actual_data_length,
"decompressing serializedpacket"
);
let data_maybe = input.input.slice_from(input.offset..);
let real_data = if data_length > 0 {
let data_maybe = input
.input
.slice(input.offset..(input.offset + actual_data_length));
let real_data = if data_length <= 0 {
Bytes::copy_from_slice(data_maybe)
} else {
let real_data = Zlib::decode(data_maybe)?;
assert_eq!(real_data.len(), data_length);
real_data
} else {
Bytes::copy_from_slice(data_maybe)
};
let data_slice = real_data.deref();
let mut data_input = Input::new(&data_slice);
Expand Down
4 changes: 2 additions & 2 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,14 @@ fn on_login(rt: Res<TokioTasksRuntime>, mut ev: EventReader<PlayerLoginEvent>, q
}),
_ => Ok(()),
};
player.cancellator.cancel();
Ok(())
}
}
},
() = player.cancellator.cancelled() => {
error!("connection terminated");
Ok(()) }
Ok(())
}
}
});
}
Expand Down

0 comments on commit 6a66458

Please sign in to comment.