fix(core): simplify action dispatch, use mutex locking, and avoid unnecessary clones (#31157)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
<!-- This is the behavior we have today -->
* There are many unwraps that we can avoid by using `parking_lot::mutex`
* There is an unnecessary `task::spawn` to dispatch actions. The channel
is non-blocking and this is wasted cycles
* There are a bunch of clones that are happening with the PtyInstances

## Expected Behavior
<!-- This is the behavior we should expect with the changes in this PR
-->
* Uses `parking_lot::Mutex` and removes `unwraps()`
* Dispatches actions directly without going into tasks
* Uses tuple derefs to avoid clones, and uses 2 instead of 4

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
This commit is contained in:
Jonathan Cammisuli 2025-05-09 15:40:07 -04:00 committed by GitHub
parent 8082184dc7
commit f5cc2d51b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 58 additions and 65 deletions

View File

@ -1034,10 +1034,11 @@ impl App {
/// Dispatches an action to the action tx for other components to handle however they see fit /// Dispatches an action to the action tx for other components to handle however they see fit
fn dispatch_action(&self, action: Action) { fn dispatch_action(&self, action: Action) {
let tx = self.action_tx.clone().unwrap(); if let Some(tx) = &self.action_tx {
tokio::spawn(async move { tx.send(action).unwrap_or_else(|e| {
let _ = tx.send(action); debug!("Failed to dispatch action: {}", e);
}); });
}
} }
fn recalculate_layout_areas(&mut self) { fn recalculate_layout_areas(&mut self) {
@ -1423,15 +1424,17 @@ impl App {
parser_and_writer: External<(ParserArc, WriterArc)>, parser_and_writer: External<(ParserArc, WriterArc)>,
) { ) {
// Access the contents of the External // Access the contents of the External
let parser_and_writer_clone = parser_and_writer.clone();
let (parser, writer) = &parser_and_writer_clone;
let pty = Arc::new( let pty = Arc::new(
PtyInstance::new(task_id.to_string(), parser.clone(), writer.clone()) PtyInstance::new(
.map_err(|e| napi::Error::from_reason(format!("Failed to create PTY: {}", e))) task_id.to_string(),
.unwrap(), parser_and_writer.0.clone(),
parser_and_writer.1.clone(),
)
.map_err(|e| napi::Error::from_reason(format!("Failed to create PTY: {}", e)))
.unwrap(),
); );
self.pty_instances.insert(task_id.to_string(), pty.clone()); self.pty_instances.insert(task_id.to_string(), pty);
} }
fn create_empty_parser_and_noop_writer() -> (ParserArc, External<(ParserArc, WriterArc)>) { fn create_empty_parser_and_noop_writer() -> (ParserArc, External<(ParserArc, WriterArc)>) {

View File

@ -1,7 +1,8 @@
use napi::bindgen_prelude::*; use napi::bindgen_prelude::*;
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction}; use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction};
use napi::JsObject; use napi::JsObject;
use std::sync::{Arc, Mutex}; use parking_lot::Mutex;
use std::sync::Arc;
use tracing::debug; use tracing::debug;
use super::app::App; use super::app::App;
@ -85,7 +86,7 @@ impl AppLifeCycle {
let initiating_tasks = initiating_tasks.into_iter().collect(); let initiating_tasks = initiating_tasks.into_iter().collect();
Self { Self {
app: Arc::new(std::sync::Mutex::new( app: Arc::new(Mutex::new(
App::new( App::new(
tasks.into_iter().collect(), tasks.into_iter().collect(),
initiating_tasks, initiating_tasks,
@ -101,9 +102,8 @@ impl AppLifeCycle {
#[napi] #[napi]
pub fn start_command(&mut self, thread_count: Option<u32>) -> napi::Result<()> { pub fn start_command(&mut self, thread_count: Option<u32>) -> napi::Result<()> {
if let Ok(mut app) = self.app.lock() { self.app.lock().start_command(thread_count);
app.start_command(thread_count);
}
Ok(()) Ok(())
} }
@ -115,9 +115,7 @@ impl AppLifeCycle {
#[napi] #[napi]
pub fn start_tasks(&mut self, tasks: Vec<Task>, _metadata: JsObject) -> napi::Result<()> { pub fn start_tasks(&mut self, tasks: Vec<Task>, _metadata: JsObject) -> napi::Result<()> {
if let Ok(mut app) = self.app.lock() { self.app.lock().start_tasks(tasks);
app.start_tasks(tasks);
}
Ok(()) Ok(())
} }
@ -129,9 +127,7 @@ impl AppLifeCycle {
output: String, output: String,
) -> napi::Result<()> { ) -> napi::Result<()> {
debug!("Received task terminal output for {}", task.id); debug!("Received task terminal output for {}", task.id);
if let Ok(mut app) = self.app.lock() { self.app.lock().print_task_terminal_output(task.id, output);
app.print_task_terminal_output(task.id, output);
}
Ok(()) Ok(())
} }
@ -141,17 +137,14 @@ impl AppLifeCycle {
task_results: Vec<TaskResult>, task_results: Vec<TaskResult>,
_metadata: JsObject, _metadata: JsObject,
) -> napi::Result<()> { ) -> napi::Result<()> {
if let Ok(mut app) = self.app.lock() { self.app.lock().end_tasks(task_results);
app.end_tasks(task_results);
}
Ok(()) Ok(())
} }
#[napi] #[napi]
pub fn end_command(&self) -> napi::Result<()> { pub fn end_command(&self) -> napi::Result<()> {
if let Ok(mut app) = self.app.lock() { self.app.lock().end_command();
app.end_command();
}
Ok(()) Ok(())
} }
@ -198,45 +191,43 @@ impl AppLifeCycle {
debug!("Initialized Action Channel"); debug!("Initialized Action Channel");
// Initialize components // Initialize components
if let Ok(mut app) = app_mutex.lock() { let mut app_guard = app_mutex.lock();
// Store callback for cleanup // Store callback for cleanup
app.set_done_callback(done_callback); app_guard.set_done_callback(done_callback);
app.register_action_handler(action_tx.clone()).ok(); app_guard.register_action_handler(action_tx.clone()).ok();
for component in app.components.iter_mut() { for component in app_guard.components.iter_mut() {
component.register_action_handler(action_tx.clone()).ok(); component.register_action_handler(action_tx.clone()).ok();
}
app.init(tui.size().unwrap()).ok();
for component in app.components.iter_mut() {
component.init(tui.size().unwrap()).ok();
}
} }
app_guard.init(tui.size().unwrap()).ok();
for component in app_guard.components.iter_mut() {
component.init(tui.size().unwrap()).ok();
}
drop(app_guard);
debug!("Initialized Components"); debug!("Initialized Components");
napi::tokio::spawn(async move { napi::tokio::spawn(async move {
loop { loop {
// Handle events using our Tui abstraction // Handle events using our Tui abstraction
if let Some(event) = tui.next().await { if let Some(event) = tui.next().await {
if let Ok(mut app) = app_mutex.lock() { let mut app = app_mutex.lock();
let _ = app.handle_event(event, &action_tx); let _ = app.handle_event(event, &action_tx);
// Check if we should quit based on the timer // Check if we should quit based on the timer
if let Some(quit_time) = app.quit_at { if let Some(quit_time) = app.quit_at {
if std::time::Instant::now() >= quit_time { if std::time::Instant::now() >= quit_time {
tui.exit().ok(); tui.exit().ok();
app.call_done_callback(); app.call_done_callback();
break; break;
}
} }
} }
} }
// Process actions // Process actions
while let Ok(action) = action_rx.try_recv() { while let Ok(action) = action_rx.try_recv() {
if let Ok(mut app) = app_mutex.lock() { app_mutex.lock().handle_action(&mut tui, action, &action_tx);
app.handle_action(&mut tui, action, &action_tx);
}
} }
} }
}); });
@ -250,29 +241,29 @@ impl AppLifeCycle {
task_id: String, task_id: String,
parser_and_writer: External<(ParserArc, WriterArc)>, parser_and_writer: External<(ParserArc, WriterArc)>,
) { ) {
let mut app = self.app.lock().unwrap(); self.app
app.register_running_task(task_id, parser_and_writer) .lock()
.register_running_task(task_id, parser_and_writer)
} }
#[napi] #[napi]
pub fn register_running_task_with_empty_parser(&mut self, task_id: String) { pub fn register_running_task_with_empty_parser(&mut self, task_id: String) {
let mut app = self.app.lock().unwrap(); self.app
app.register_running_task_with_empty_parser(task_id) .lock()
.register_running_task_with_empty_parser(task_id)
} }
#[napi] #[napi]
pub fn append_task_output(&mut self, task_id: String, output: String, is_pty_output: bool) { pub fn append_task_output(&mut self, task_id: String, output: String, is_pty_output: bool) {
// If its from a pty, we already have it in the parser, so we don't need to append it again // If its from a pty, we already have it in the parser, so we don't need to append it again
if !is_pty_output { if !is_pty_output {
let mut app = self.app.lock().unwrap(); self.app.lock().append_task_output(task_id, output)
app.append_task_output(task_id, output)
} }
} }
#[napi] #[napi]
pub fn set_task_status(&mut self, task_id: String, status: TaskStatus) { pub fn set_task_status(&mut self, task_id: String, status: TaskStatus) {
let mut app = self.app.lock().unwrap(); self.app.lock().update_task_status(task_id, status)
app.update_task_status(task_id, status)
} }
#[napi] #[napi]
@ -280,18 +271,17 @@ impl AppLifeCycle {
&self, &self,
forced_shutdown_callback: ThreadsafeFunction<(), ErrorStrategy::Fatal>, forced_shutdown_callback: ThreadsafeFunction<(), ErrorStrategy::Fatal>,
) -> napi::Result<()> { ) -> napi::Result<()> {
if let Ok(mut app) = self.app.lock() { self.app
app.set_forced_shutdown_callback(forced_shutdown_callback); .lock()
} .set_forced_shutdown_callback(forced_shutdown_callback);
Ok(()) Ok(())
} }
// Rust-only lifecycle method // Rust-only lifecycle method
#[napi(js_name = "__setCloudMessage")] #[napi(js_name = "__setCloudMessage")]
pub async fn __set_cloud_message(&self, message: String) -> napi::Result<()> { pub async fn __set_cloud_message(&self, message: String) -> napi::Result<()> {
if let Ok(mut app) = self.app.lock() { self.app.lock().set_cloud_message(Some(message));
app.set_cloud_message(Some(message));
}
Ok(()) Ok(())
} }
} }