diff --git a/crates/turborepo-ui/src/wui/server.rs b/crates/turborepo-ui/src/wui/server.rs index ddb9bad6b1110..81858b4b579cb 100644 --- a/crates/turborepo-ui/src/wui/server.rs +++ b/crates/turborepo-ui/src/wui/server.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, sync::Arc}; +use std::sync::Arc; use async_graphql::{ http::GraphiQLSource, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject, @@ -31,7 +31,6 @@ impl<'a> CurrentRun<'a> { self.state .lock() .await - .borrow() .tasks() .iter() .map(|(task, state)| Task { @@ -45,7 +44,7 @@ impl<'a> CurrentRun<'a> { /// We keep the state in a `Arc>>` so both `Subscriber` and /// `Query` can access it, with `Subscriber` mutating it and `Query` only /// reading it. -type SharedState = Arc>>; +pub(crate) type SharedState = Arc>; pub struct Query { state: SharedState, @@ -76,7 +75,7 @@ async fn graphiql() -> impl IntoResponse { pub async fn start_server( rx: tokio::sync::mpsc::UnboundedReceiver, ) -> Result<(), crate::Error> { - let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default()))); + let state = Arc::new(Mutex::new(WebUIState::default())); let subscriber = Subscriber::new(rx); tokio::spawn(subscriber.watch(state.clone())); diff --git a/crates/turborepo-ui/src/wui/subscriber.rs b/crates/turborepo-ui/src/wui/subscriber.rs index e8db044c52b04..c8196c2ba4aba 100644 --- a/crates/turborepo-ui/src/wui/subscriber.rs +++ b/crates/turborepo-ui/src/wui/subscriber.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use async_graphql::{Enum, SimpleObject}; use serde::Serialize; @@ -6,7 +6,7 @@ use tokio::sync::Mutex; use crate::{ tui::event::{CacheResult, TaskResult}, - wui::event::WebUIEvent, + wui::{event::WebUIEvent, server::SharedState}, }; /// Subscribes to the Web UI events and updates the state @@ -22,7 +22,7 @@ impl Subscriber { pub async fn watch( self, // We use a tokio::sync::Mutex here because we want this future to be Send. - #[allow(clippy::type_complexity)] state: Arc>>, + #[allow(clippy::type_complexity)] state: SharedState, ) { let mut rx = self.rx; while let Some(event) = rx.recv().await { @@ -30,15 +30,15 @@ impl Subscriber { } } - async fn add_message(state: &Arc>>, event: WebUIEvent) { - let state = state.lock().await; + async fn add_message(state: &Arc>, event: WebUIEvent) { + let mut state = state.lock().await; match event { WebUIEvent::StartTask { task, output_logs: _, } => { - state.borrow_mut().tasks.insert( + state.tasks.insert( task, TaskState { output: Vec::new(), @@ -49,35 +49,27 @@ impl Subscriber { ); } WebUIEvent::TaskOutput { task, output } => { - state - .borrow_mut() - .tasks - .get_mut(&task) - .unwrap() - .output - .extend(output); + state.tasks.get_mut(&task).unwrap().output.extend(output); } WebUIEvent::EndTask { task, result } => { - state.borrow_mut().tasks.get_mut(&task).unwrap().status = TaskStatus::from(result); + state.tasks.get_mut(&task).unwrap().status = TaskStatus::from(result); } WebUIEvent::CacheStatus { task, result, message, } => { - let mut state_ref = state.borrow_mut(); - if result == CacheResult::Hit { - state_ref.tasks.get_mut(&task).unwrap().status = TaskStatus::Cached; + state.tasks.get_mut(&task).unwrap().status = TaskStatus::Cached; } - state_ref.tasks.get_mut(&task).unwrap().cache_result = Some(result); - state_ref.tasks.get_mut(&task).unwrap().cache_message = Some(message); + state.tasks.get_mut(&task).unwrap().cache_result = Some(result); + state.tasks.get_mut(&task).unwrap().cache_message = Some(message); } WebUIEvent::Stop => { // TODO: stop watching } WebUIEvent::UpdateTasks { tasks } => { - state.borrow_mut().tasks = tasks + state.tasks = tasks .into_iter() .map(|task| { ( @@ -93,7 +85,7 @@ impl Subscriber { .collect(); } WebUIEvent::RestartTasks { tasks } => { - state.borrow_mut().tasks = tasks + state.tasks = tasks .into_iter() .map(|task| { ( @@ -164,7 +156,7 @@ mod test { #[tokio::test] async fn test_web_ui_state() -> Result<(), crate::Error> { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default()))); + let state = Arc::new(Mutex::new(WebUIState::default())); let subscriber = Subscriber::new(rx); let sender = WebUISender::new(tx); @@ -188,7 +180,7 @@ mod test { // Run the subscriber blocking subscriber.watch(state.clone()).await; - let state_handle = state.lock().await.borrow().clone(); + let state_handle = state.lock().await.clone(); assert_eq!(state_handle.tasks().len(), 3); assert_eq!( state_handle.tasks().get("task2").unwrap().status, @@ -248,7 +240,7 @@ mod test { #[tokio::test] async fn test_restart_tasks() -> Result<(), crate::Error> { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let state = Arc::new(Mutex::new(RefCell::new(WebUIState::default()))); + let state = Arc::new(Mutex::new(WebUIState::default())); let subscriber = Subscriber::new(rx); let sender = WebUISender::new(tx); @@ -271,7 +263,7 @@ mod test { // Run the subscriber blocking subscriber.watch(state.clone()).await; - let state_handle = state.lock().await.borrow().clone(); + let state_handle = state.lock().await.clone(); assert_eq!(state_handle.tasks().len(), 1); assert_eq!( state_handle.tasks().get("task").unwrap().status,