Skip to content

Commit

Permalink
fix compressing order
Browse files Browse the repository at this point in the history
  • Loading branch information
nothendev committed Oct 8, 2023
1 parent 88b1c61 commit f08fcd5
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 50 deletions.
5 changes: 4 additions & 1 deletion protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ pub enum Error {
InvalidPacketId(i32),
#[error("Packet send error: {_0:?}")]
#[diagnostic(code(flume::error::send))]
Send(#[from] flume::SendError<SerializedPacket>),
Send(#[from] flume::SendError<(bool, SerializedPacket)>),
#[error("Packet send error: {_0:?}")]
#[diagnostic(code(flume::error::send))]
SendSingle(#[from] flume::SendError<SerializedPacket>),
#[error("Packet receive error: {_0:?}")]
#[diagnostic(code(flume::error::recv))]
Recv(#[from] flume::RecvError),
Expand Down
4 changes: 2 additions & 2 deletions protocol/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,10 @@ impl TaskContext {
let (output_tx, output_rx) = tokio::sync::oneshot::channel();
if self.update_run_tx.send(Box::new(move |ctx| {
if output_tx.send(runnable(ctx)).is_err() {
panic!("Failed to sent output from operation run on main thread back to waiting task");
tracing::error!("Failed to sent output from operation run on main thread back to waiting task");
}
})).is_err() {
panic!("Failed to send operation to be run on main thread");
tracing::error!("Failed to send operation to be run on main thread");
}
output_rx
.await
Expand Down
30 changes: 20 additions & 10 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ use std::{
fmt::Debug,
net::SocketAddr,
ops::{Deref, DerefMut},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

Expand All @@ -156,13 +159,13 @@ use crate::{

#[derive(Debug)]
pub struct PlayerNet {
pub send: flume::Sender<SerializedPacket>,
pub send: flume::Sender<(bool, SerializedPacket)>,
pub recv: flume::Receiver<SerializedPacket>,
pub peer_addr: SocketAddr,
pub local_addr: SocketAddr,
pub state: RwLock<State>,
pub compression: Option<usize>,
pub compressing: Arc<RwLock<bool>>,
pub compressing: Arc<AtomicBool>,
pub cancellator: CancellationToken,
}

Expand All @@ -186,14 +189,13 @@ impl PlayerNet {
let (s_recv, recv) = flume::unbounded();
let (send, r_send) = flume::unbounded();

let compressing = Arc::new(RwLock::new(false));
let compressing = Arc::new(AtomicBool::new(false));

let compressing_ = compressing.clone();
let send_task = tokio::spawn(async move {
let Err::<!, _>(e) = async {
loop {
let packet: SerializedPacket = r_send.recv_async().await?;
let compres = compressing_.get_copy().await;
let (compres, packet): (bool, SerializedPacket) = r_send.recv_async().await?;
let data = if compression.is_some_and(|_| compres) {
trace!("[send]compressing");
packet.serialize_compressing(compression)?
Expand All @@ -220,7 +222,7 @@ impl PlayerNet {
loop {
let bufslice = &buf[..];
let mut input = Input::new(&bufslice);
if let Some(packet) = match if compressing__.get_copy().await {
if let Some(packet) = match if compressing__.load(Ordering::SeqCst) {
SerializedPacketCompressed::deserialize
.parse_with(&mut input)
.map(SerializedPacket::from)
Expand Down Expand Up @@ -305,24 +307,32 @@ impl PlayerNet {
}

/// Writes a packet.
pub fn send_packet<T: Packet + Serialize + Debug>(&self, packet: T) -> Result<()> {
pub async fn send_packet<T: Packet + Serialize + Debug>(&self, packet: T) -> Result<()> {
if self.send.is_disconnected() {
trace!(?packet, addr=%self.peer_addr, "sending packet failed - disconnected");
return Err(crate::error::Error::ConnectionEnded);
}
let spack = SerializedPacket::new_ref(&packet)?;
trace!(?packet, addr=%self.peer_addr, ?spack, "Sending packet");
Ok(self.send.send(spack)?)
Ok(self
.send
.send_async((self.compressing.load(Ordering::SeqCst), spack))
.await?)
}

/// Sends a plugin message.
/// Equivalent to `self.send_packet(PluginMessage { channel, data: data.serialize() })`
pub fn plugin_message<T: Serialize + Debug>(&self, channel: Identifier, data: T) -> Result<()> {
pub async fn plugin_message<T: Serialize + Debug>(
&self,
channel: Identifier,
data: T,
) -> Result<()> {
trace!(?channel, ?data, %self.peer_addr, "Sending plugin message");
self.send_packet(PluginMessage {
channel,
data: data.serialize()?,
})
.await
}

/// Receives a packet and tries to deserialize it.
Expand Down
7 changes: 6 additions & 1 deletion protocol/src/model/packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl SerializedPacket {
Zlib::encode(&self.data)?.len() + VarInt(maybe_data_length as i32).length_of(),
)
} else {
trace!("packet was smaller than threshold {cmp}, sending uncompressed");
(0, self.length)
};
let pack = SerializedPacketCompressed {
Expand Down Expand Up @@ -270,7 +271,11 @@ impl Serialize for SerializedPacketCompressed {
.map_err(|_| Error::VarIntTooBig)?,
);
data_length.serialize_to(buf)?;
Compress((&self.id, &self.data), Zlib).serialize_to(buf)?;
if self.data_length > 0 {
Compress((&self.id, &self.data), Zlib).serialize_to(buf)?
} else {
(&self.id, &self.data).serialize_to(buf)?
}
Ok(())
}
}
Expand Down
89 changes: 53 additions & 36 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use oxcr_protocol::{
uuid::Uuid,
AsyncSet, PlayerN, PlayerNet, ProtocolPlugin,
};
use std::{net::SocketAddr, sync::Arc};
use std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
Expand Down Expand Up @@ -84,15 +87,17 @@ async fn login(net: Arc<PlayerNet>, cx: Arc<TaskContext>, ent_id: Entity) -> Res

net.send_packet(SetCompression {
threshold: VarInt::<i32>(threshold.try_into().unwrap()),
})?;
})
.await?;

net.compressing.set(true).await;
net.compressing.store(true, Ordering::SeqCst);
}

net.send_packet(LoginSuccess {
uuid,
username: name.clone(),
})?;
})
.await?;

net.state.set(State::Play).await;

Expand Down Expand Up @@ -168,28 +173,32 @@ async fn login(net: Arc<PlayerNet>, cx: Arc<TaskContext>, ent_id: Entity) -> Res
portal_cooldown: VarInt(20),
};

net.send_packet(login_play)?;
net.send_packet(login_play).await?;

let difficulty = cx
.run_on_main_thread(move |w| *w.world.resource::<DifficultySetting>())
.await;

net.send_packet(FeatureFlags {
feature_flags: Array::new(&[FeatureFlags::FEATURE_VANILLA]),
})?;
})
.await?;

net.plugin_message(Identifier::MINECRAFT_BRAND, "implodent")?;
net.plugin_message(Identifier::MINECRAFT_BRAND, "implodent")
.await?;

net.send_packet(ChangeDifficulty {
difficulty: difficulty.difficulty,
difficulty_locked: difficulty.is_locked,
})?;
})
.await?;

net.send_packet(PlayerAbilities {
flags: Abilities::FLYING,
flying_speed: 0.05f32,
fov_modifier: 0.1f32,
})?;
})
.await?;

net.send_packet(SetDefaultSpawnPosition {
location: Position {
Expand All @@ -198,7 +207,8 @@ async fn login(net: Arc<PlayerNet>, cx: Arc<TaskContext>, ent_id: Entity) -> Res
y: 50i8.into(),
},
angle: 0f32,
})?;
})
.await?;

Ok(())
}
Expand Down Expand Up @@ -242,10 +252,11 @@ async fn status(net: Arc<PlayerNet>) -> Result<()> {
},
..Default::default()
}),
})?;
})
.await?;

let PingRequest { payload } = net.recv_packet().await?;
net.send_packet(PongResponse { payload })?;
net.send_packet(PongResponse { payload }).await?;

Ok(())
}
Expand Down Expand Up @@ -324,30 +335,36 @@ fn on_login(rt: Res<TokioTasksRuntime>, mut ev: EventReader<PlayerLoginEvent>, q
let player = q.get(event.entity).unwrap().0.clone();
rt.spawn_background_task(move |task| async move {
let cx = Arc::new(task);
match when_the_miette(lifecycle(player.clone(), cx.clone(), event.entity).await) {
Ok(()) => Ok::<(), Error>(()),
Err(e) => {
error!(error=?e, ?player, "Disconnecting");

// ignore the result because we term the connection afterwards
let _ = match *(player.state.read().await) {
State::Login => player.send_packet(DisconnectLogin {
reason: Json(ChatComponent::String(ChatStringComponent {
text: format!("{e}"),
..Default::default()
})),
}),
State::Play => player.send_packet(DisconnectPlay {
reason: Json(ChatComponent::String(ChatStringComponent {
text: format!("{e}"),
..Default::default()
})),
}),
_ => Ok(()),
};

player.cancellator.cancel();

tokio::select! {
lfc = lifecycle(player.clone(), cx.clone(), event.entity) => match when_the_miette(lfc) {
Ok(()) => Ok::<(), Error>(()),
Err(e) => {
error!(error=?e, ?player, "Disconnecting");

// ignore the result because we term the connection afterwards
let _ = match *(player.state.read().await) {
State::Login => player.send_packet(DisconnectLogin {
reason: Json(ChatComponent::String(ChatStringComponent {
text: format!("{e}"),
..Default::default()
})),
}).await,
State::Play => player.send_packet(DisconnectPlay {
reason: Json(ChatComponent::String(ChatStringComponent {
text: format!("{e}"),
..Default::default()
})),
}).await,
_ => Ok(()),
};

player.cancellator.cancel();

Ok(())
}
},
_ = player.cancellator.cancelled() => {
info!("connection ended");
Ok(())
}
}
Expand Down

0 comments on commit f08fcd5

Please sign in to comment.