Skip to content

Commit

Permalink
Removed RefCell
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasLYang committed Sep 10, 2024
1 parent 30ddc9c commit f65af06
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 29 deletions.
7 changes: 3 additions & 4 deletions crates/turborepo-ui/src/wui/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cell::RefCell, sync::Arc};
use std::sync::Arc;

use async_graphql::{
http::GraphiQLSource, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject,
Expand Down Expand Up @@ -31,7 +31,6 @@ impl<'a> CurrentRun<'a> {
self.state
.lock()
.await
.borrow()
.tasks()
.iter()
.map(|(task, state)| Task {
Expand All @@ -45,7 +44,7 @@ impl<'a> CurrentRun<'a> {
/// We keep the state in a `Arc<Mutex<RefCell<T>>>` so both `Subscriber` and
/// `Query` can access it, with `Subscriber` mutating it and `Query` only
/// reading it.
type SharedState = Arc<Mutex<RefCell<WebUIState>>>;
pub(crate) type SharedState = Arc<Mutex<WebUIState>>;

pub struct Query {
state: SharedState,
Expand Down Expand Up @@ -76,7 +75,7 @@ async fn graphiql() -> impl IntoResponse {
pub async fn start_server(
rx: tokio::sync::mpsc::UnboundedReceiver<WebUIEvent>,
) -> Result<(), crate::Error> {
let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default())));
let state = Arc::new(Mutex::new(WebUIState::default()));
let subscriber = Subscriber::new(rx);
tokio::spawn(subscriber.watch(state.clone()));

Expand Down
42 changes: 17 additions & 25 deletions crates/turborepo-ui/src/wui/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{cell::RefCell, collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};

use async_graphql::{Enum, SimpleObject};
use serde::Serialize;
use tokio::sync::Mutex;

use crate::{
tui::event::{CacheResult, TaskResult},
wui::event::WebUIEvent,
wui::{event::WebUIEvent, server::SharedState},
};

/// Subscribes to the Web UI events and updates the state
Expand All @@ -22,23 +22,23 @@ impl Subscriber {
pub async fn watch(
self,
// We use a tokio::sync::Mutex here because we want this future to be Send.
#[allow(clippy::type_complexity)] state: Arc<Mutex<RefCell<WebUIState>>>,
#[allow(clippy::type_complexity)] state: SharedState,
) {
let mut rx = self.rx;
while let Some(event) = rx.recv().await {
Self::add_message(&state, event).await;
}
}

async fn add_message(state: &Arc<Mutex<RefCell<WebUIState>>>, event: WebUIEvent) {
let state = state.lock().await;
async fn add_message(state: &Arc<Mutex<WebUIState>>, event: WebUIEvent) {
let mut state = state.lock().await;

match event {
WebUIEvent::StartTask {
task,
output_logs: _,
} => {
state.borrow_mut().tasks.insert(
state.tasks.insert(
task,
TaskState {
output: Vec::new(),
Expand All @@ -49,35 +49,27 @@ impl Subscriber {
);
}
WebUIEvent::TaskOutput { task, output } => {
state
.borrow_mut()
.tasks
.get_mut(&task)
.unwrap()
.output
.extend(output);
state.tasks.get_mut(&task).unwrap().output.extend(output);
}
WebUIEvent::EndTask { task, result } => {
state.borrow_mut().tasks.get_mut(&task).unwrap().status = TaskStatus::from(result);
state.tasks.get_mut(&task).unwrap().status = TaskStatus::from(result);
}
WebUIEvent::CacheStatus {
task,
result,
message,
} => {
let mut state_ref = state.borrow_mut();

if result == CacheResult::Hit {
state_ref.tasks.get_mut(&task).unwrap().status = TaskStatus::Cached;
state.tasks.get_mut(&task).unwrap().status = TaskStatus::Cached;
}
state_ref.tasks.get_mut(&task).unwrap().cache_result = Some(result);
state_ref.tasks.get_mut(&task).unwrap().cache_message = Some(message);
state.tasks.get_mut(&task).unwrap().cache_result = Some(result);
state.tasks.get_mut(&task).unwrap().cache_message = Some(message);
}
WebUIEvent::Stop => {
// TODO: stop watching
}
WebUIEvent::UpdateTasks { tasks } => {
state.borrow_mut().tasks = tasks
state.tasks = tasks
.into_iter()
.map(|task| {
(
Expand All @@ -93,7 +85,7 @@ impl Subscriber {
.collect();
}
WebUIEvent::RestartTasks { tasks } => {
state.borrow_mut().tasks = tasks
state.tasks = tasks
.into_iter()
.map(|task| {
(
Expand Down Expand Up @@ -164,7 +156,7 @@ mod test {
#[tokio::test]
async fn test_web_ui_state() -> Result<(), crate::Error> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default())));
let state = Arc::new(Mutex::new(WebUIState::default()));
let subscriber = Subscriber::new(rx);

let sender = WebUISender::new(tx);
Expand All @@ -188,7 +180,7 @@ mod test {
// Run the subscriber blocking
subscriber.watch(state.clone()).await;

let state_handle = state.lock().await.borrow().clone();
let state_handle = state.lock().await.clone();
assert_eq!(state_handle.tasks().len(), 3);
assert_eq!(
state_handle.tasks().get("task2").unwrap().status,
Expand Down Expand Up @@ -248,7 +240,7 @@ mod test {
#[tokio::test]
async fn test_restart_tasks() -> Result<(), crate::Error> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default())));
let state = Arc::new(Mutex::new(WebUIState::default()));
let subscriber = Subscriber::new(rx);

let sender = WebUISender::new(tx);
Expand All @@ -271,7 +263,7 @@ mod test {
// Run the subscriber blocking
subscriber.watch(state.clone()).await;

let state_handle = state.lock().await.borrow().clone();
let state_handle = state.lock().await.clone();
assert_eq!(state_handle.tasks().len(), 1);
assert_eq!(
state_handle.tasks().get("task").unwrap().status,
Expand Down

0 comments on commit f65af06

Please sign in to comment.