From 0eb23556812def818e469990198cd85028bf6432 Mon Sep 17 00:00:00 2001 From: nothendev Date: Sun, 8 Oct 2023 20:58:15 +0300 Subject: [PATCH] fix: implement framing in recv_task --- protocol/src/lib.rs | 51 +++++++++++++++++++++++++-------------------- rustfmt.toml | 1 + 2 files changed, 29 insertions(+), 23 deletions(-) create mode 100644 rustfmt.toml diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index e42c897..f4363b1 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -218,33 +218,38 @@ impl PlayerNet { async { let mut buf = BytesMut::new(); - let mut first = true; - while read.read_buf(&mut buf).await? != 0 { - let compres = compressing__.get_copy().await; + loop { let bufslice = &buf[..]; let mut input = Input::new(&bufslice); - let spack = if let Some(cmp) = compression.filter(|_| compres) { - trace!("[recv]compressing"); - let sp: SerializedPacket = SerializedPacketCompressed::deserialize - .parse_with(&mut input)? - .into(); - if sp.length < cmp { - warn!(?sp, ?cmp, "packet length was less than threshold"); - } - sp + if let Some(packet) = match (if compressing__.get_copy().await { + SerializedPacketCompressed::deserialize + .parse_with(&mut input) + .map(SerializedPacket::from) } else { - trace!("[recv]not compressing"); - SerializedPacket::deserialize.parse_with(&mut input)? - }; - let read_bytes = buf.len(); - let unread_bytes = read_bytes - input.offset; - trace!(buf=%format!("{:#x?}", &buf[unread_bytes..]), ?input.offset, ?read_bytes, ?unread_bytes, ?spack, "recving packet"); - let offset = input.offset; - drop((bufslice, input)); - s_recv.send_async(spack).await?; - buf = buf.split_off(offset); + SerializedPacket::deserialize.parse_with(&mut input) + }) { + Ok(packet) => Some(packet), + Err(crate::error::Error::Ser( + crate::ser::SerializationError::UnexpectedEof { .. }, + )) => None, + Err(other) => return Err(other), + } { + let offset = input.offset; + drop((bufslice, input)); + buf = buf.split_off(offset); + s_recv.send_async(packet).await?; + } else { + if read.read_buf(&mut buf).await? == 0 { + if buf.is_empty() { + break; + } else { + return Err(crate::error::Error::ConnectionEnded); + } + } + } } - Ok::<(), crate::error::Error>(()) + + Ok(()) } .await?; diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..3a26366 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +edition = "2021"