From 149d47c5efc4db05a6b83ab8a83e88c7e0d10eba Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Tue, 14 Apr 2026 15:04:38 +0100 Subject: [PATCH] Acquire tokio mutex before spawn_blocking to avoid blocking pool threads while waiting for locks Signed-off-by: Jorge Prendes --- src/js-host-api/src/lib.rs | 259 ++++++++++++++++++++++--------------- 1 file changed, 152 insertions(+), 107 deletions(-) diff --git a/src/js-host-api/src/lib.rs b/src/js-host-api/src/lib.rs index a8a9678..34facbf 100644 --- a/src/js-host-api/src/lib.rs +++ b/src/js-host-api/src/lib.rs @@ -28,7 +28,7 @@ use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi::{tokio, Status}; use napi_derive::napi; use serde_json::Value as JsonValue; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, Mutex as AsyncMutex, OwnedMappedMutexGuard, OwnedMutexGuard}; // ── napi-rs wrapper architecture ────────────────────────────────────── // @@ -47,6 +47,20 @@ use tokio::sync::oneshot; // `spawn_blocking` closure on a background thread — we clone the `Arc` so the // wrapper struct remains valid on the JS side while the Rust side works. // +// ## Mutex choice: std vs tokio +// +// Most wrappers (`SandboxBuilderWrapper`, `ProtoJSSandboxWrapper`, +// `JSSandboxWrapper`) use `std::sync::Mutex` since they only hold locks +// briefly for quick operations. +// +// `LoadedJSSandboxWrapper` uses `tokio::sync::Mutex` because `call_handler()` +// holds the lock for the **entire duration** of guest code execution +// (potentially seconds). Using an async mutex allows: +// - Async methods (`unload`, `dispose`) to `.lock().await` without blocking +// - Long-running methods (`call_handler`, `snapshot`, `restore`) to acquire the +// lock asynchronously, then move the `OwnedMutexGuard` into `spawn_blocking` +// for CPU-intensive guest execution +// // ## Why `LoadedJSSandboxWrapper` stores fields outside the Mutex // // `call_handler()` holds the Mutex for the **entire duration** of @@ -786,7 +800,7 @@ impl JSSandboxWrapper { let interrupt = loaded_sandbox.interrupt_handle(); let poisoned_flag = Arc::new(AtomicBool::new(loaded_sandbox.poisoned())); Ok(LoadedJSSandboxWrapper { - inner: Arc::new(Mutex::new(Some(loaded_sandbox))), + inner: Arc::new(AsyncMutex::new(Some(loaded_sandbox))), interrupt, poisoned_flag, last_call_stats: Arc::new(ArcSwapOption::empty()), @@ -836,7 +850,7 @@ impl JSSandboxWrapper { /// ``` #[napi(js_name = "LoadedJSSandbox")] pub struct LoadedJSSandboxWrapper { - inner: Arc>>, + inner: Arc>>, /// Stored **outside** the Mutex so callers can `kill()` a running handler. /// @@ -875,6 +889,68 @@ pub struct LoadedJSSandboxWrapper { disposed_flag: Arc, } +type LoadedJSSandboxGuard = OwnedMappedMutexGuard, LoadedJSSandbox>; + +impl LoadedJSSandboxWrapper { + /// Borrow the inner value mutably via Mutex, or error if consumed. + async fn with_inner( + &self, + f: impl AsyncFnOnce(LoadedJSSandboxGuard) -> napi::Result, + ) -> napi::Result { + let sandbox = self.inner.clone().lock_owned().await; + let sandbox = OwnedMutexGuard::try_map(sandbox, Option::as_mut) + .map_err(|_| consumed_error("LoadedJSSandbox"))?; + f(sandbox).await + } + + /// Borrow the inner value mutably via Mutex, or error if consumed. + /// The closure `f` will run using spawn_blocking, so it can perform long-running operations without + /// blocking the Node.js event loop. This is the main way to interact with the inner `LoadedJSSandbox`. + async fn with_blocking_inner( + &self, + f: impl FnOnce(LoadedJSSandboxGuard) -> napi::Result + Send + 'static, + ) -> napi::Result { + self.with_inner(async move |sandbox| { + tokio::task::spawn_blocking(move || f(sandbox)) + .await + .map_err(join_error)? + }) + .await + } + + /// Take ownership of the inner value, returning a consumed-state error if + /// this instance has already been used. + async fn take_inner_with( + &self, + f: impl AsyncFnOnce(LoadedJSSandbox) -> napi::Result, + ) -> napi::Result { + let sandbox = self + .inner + .lock() + .await + .take() + .ok_or_else(|| consumed_error("LoadedJSSandbox"))?; + self.disposed_flag.store(true, Ordering::Release); + f(sandbox).await + } + + /// Take ownership of the inner value, returning a consumed-state error if + /// this instance has already been used. + /// The closure `f` will run using spawn_blocking, so it can perform long-running operations without + /// blocking the Node.js event loop. This is the main way to interact with the inner `LoadedJSSandbox`. + async fn take_blocking_inner_with( + &self, + f: impl FnOnce(LoadedJSSandbox) -> napi::Result + Send + 'static, + ) -> napi::Result { + self.take_inner_with(async move |sandbox| { + tokio::task::spawn_blocking(move || f(sandbox)) + .await + .map_err(join_error)? + }) + .await + } +} + #[napi] impl LoadedJSSandboxWrapper { /// Invoke a handler function with the given event data, optionally @@ -939,7 +1015,6 @@ impl LoadedJSSandboxWrapper { ))); } - let inner = self.inner.clone(); let poisoned_flag = self.poisoned_flag.clone(); let last_call_stats_store = self.last_call_stats.clone(); let gc = options.gc; @@ -950,68 +1025,64 @@ impl LoadedJSSandboxWrapper { let event_json = serde_json::to_string(&event_data) .map_err(|e| invalid_arg_error(&format!("Failed to serialize event: {e}")))?; - let result_json = tokio::task::spawn_blocking(move || { - let mut guard = inner.lock().map_err(|_| lock_error())?; - let sandbox = guard - .as_mut() - .ok_or_else(|| consumed_error("LoadedJSSandbox"))?; - - // Dispatch to the appropriate Rust method based on whether - // any monitor timeouts are specified. - // - // The three `handle_event_with_monitor` arms look duplicated, but - // each constructs a different concrete monitor type (single or tuple). - // The sealed `MonitorSet` trait is not object-safe, so we can't - // erase the type behind a `dyn` — the match is structurally required. - let result = match (wall_clock_timeout_ms, cpu_timeout_ms) { - // No monitors — fast path - (None, None) => sandbox - .handle_event(handler_name, event_json, gc) - .map_err(to_napi_error), - // Both — tuple with OR semantics (recommended) - (Some(wall_ms), Some(cpu_ms)) => { - let monitor = ( - WallClockMonitor::new(Duration::from_millis(wall_ms as u64)) - .map_err(to_napi_error)?, - CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64)) - .map_err(to_napi_error)?, - ); - sandbox - .handle_event_with_monitor(handler_name, event_json, &monitor, gc) - .map_err(to_napi_error) - } - // Wall-clock only - (Some(wall_ms), None) => { - let monitor = WallClockMonitor::new(Duration::from_millis(wall_ms as u64)) - .map_err(to_napi_error)?; + let result_json = self + .with_blocking_inner(move |mut sandbox| { + // Dispatch to the appropriate Rust method based on whether + // any monitor timeouts are specified. + // + // The three `handle_event_with_monitor` arms look duplicated, but + // each constructs a different concrete monitor type (single or tuple). + // The sealed `MonitorSet` trait is not object-safe, so we can't + // erase the type behind a `dyn` — the match is structurally required. + let result = match (wall_clock_timeout_ms, cpu_timeout_ms) { + // No monitors — fast path + (None, None) => sandbox + .handle_event(handler_name, event_json, gc) + .map_err(to_napi_error), + // Both — tuple with OR semantics (recommended) + (Some(wall_ms), Some(cpu_ms)) => { + let monitor = ( + WallClockMonitor::new(Duration::from_millis(wall_ms as u64)) + .map_err(to_napi_error)?, + CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64)) + .map_err(to_napi_error)?, + ); + sandbox + .handle_event_with_monitor(handler_name, event_json, &monitor, gc) + .map_err(to_napi_error) + } + // Wall-clock only + (Some(wall_ms), None) => { + let monitor = WallClockMonitor::new(Duration::from_millis(wall_ms as u64)) + .map_err(to_napi_error)?; + sandbox + .handle_event_with_monitor(handler_name, event_json, &monitor, gc) + .map_err(to_napi_error) + } + // CPU only + (None, Some(cpu_ms)) => { + let monitor = CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64)) + .map_err(to_napi_error)?; + sandbox + .handle_event_with_monitor(handler_name, event_json, &monitor, gc) + .map_err(to_napi_error) + } + }; + // Update poisoned flag while we hold the lock — keeps the getter + // lock-free so it never blocks the Node.js event loop. + poisoned_flag.store(sandbox.poisoned(), Ordering::Release); + + // Copy execution stats while we still hold the lock. + last_call_stats_store.store( sandbox - .handle_event_with_monitor(handler_name, event_json, &monitor, gc) - .map_err(to_napi_error) - } - // CPU only - (None, Some(cpu_ms)) => { - let monitor = CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64)) - .map_err(to_napi_error)?; - sandbox - .handle_event_with_monitor(handler_name, event_json, &monitor, gc) - .map_err(to_napi_error) - } - }; - // Update poisoned flag while we hold the lock — keeps the getter - // lock-free so it never blocks the Node.js event loop. - poisoned_flag.store(sandbox.poisoned(), Ordering::Release); + .last_call_stats() + .map(|s| Arc::new(CallStats::from(s))), + ); - // Copy execution stats while we still hold the lock. - last_call_stats_store.store( - sandbox - .last_call_stats() - .map(|s| Arc::new(CallStats::from(s))), - ); + result + }) + .await?; - result - }) - .await - .map_err(join_error)??; // Parse the JSON string result back into a JS object serde_json::from_str(&result_json).map_err(|e| { hl_error( @@ -1032,18 +1103,9 @@ impl LoadedJSSandboxWrapper { /// @throws If already consumed #[napi] pub async fn unload(&self) -> napi::Result { - let inner = self.inner.clone(); - let disposed = self.disposed_flag.clone(); - let js_sandbox = tokio::task::spawn_blocking(move || { - let mut guard = inner.lock().map_err(|_| lock_error())?; - let loaded = guard - .take() - .ok_or_else(|| consumed_error("LoadedJSSandbox"))?; - disposed.store(true, Ordering::Release); - loaded.unload().map_err(to_napi_error) - }) - .await - .map_err(join_error)??; + let js_sandbox = self + .take_blocking_inner_with(|sandbox| sandbox.unload().map_err(to_napi_error)) + .await?; Ok(JSSandboxWrapper { inner: Arc::new(Mutex::new(Some(js_sandbox))), }) @@ -1145,19 +1207,15 @@ impl LoadedJSSandboxWrapper { /// @throws If already consumed #[napi] pub async fn snapshot(&self) -> napi::Result { - let inner = self.inner.clone(); let poisoned_flag = self.poisoned_flag.clone(); - let snapshot = tokio::task::spawn_blocking(move || { - let mut guard = inner.lock().map_err(|_| lock_error())?; - let sandbox = guard - .as_mut() - .ok_or_else(|| consumed_error("LoadedJSSandbox"))?; - let result = sandbox.snapshot().map_err(to_napi_error); - poisoned_flag.store(sandbox.poisoned(), Ordering::Release); - result - }) - .await - .map_err(join_error)??; + + let snapshot = self + .with_blocking_inner(move |mut sandbox| { + let result = sandbox.snapshot().map_err(to_napi_error); + poisoned_flag.store(sandbox.poisoned(), Ordering::Release); + result + }) + .await?; Ok(SnapshotWrapper { inner: snapshot }) } @@ -1172,20 +1230,17 @@ impl LoadedJSSandboxWrapper { /// @throws If the snapshot doesn't match this sandbox, or if consumed #[napi] pub async fn restore(&self, snapshot: &SnapshotWrapper) -> napi::Result<()> { - let inner = self.inner.clone(); let snap = snapshot.inner.clone(); let poisoned_flag = self.poisoned_flag.clone(); - tokio::task::spawn_blocking(move || { - let mut guard = inner.lock().map_err(|_| lock_error())?; - let sandbox = guard - .as_mut() - .ok_or_else(|| consumed_error("LoadedJSSandbox"))?; + + self.with_blocking_inner(move |mut sandbox| { let result = sandbox.restore(snap).map_err(to_napi_error); poisoned_flag.store(sandbox.poisoned(), Ordering::Release); result }) - .await - .map_err(join_error)? + .await?; + + Ok(()) } /// Eagerly release the underlying sandbox resources. @@ -1198,18 +1253,8 @@ impl LoadedJSSandboxWrapper { /// Calling `dispose()` on an already-consumed sandbox is a no-op. #[napi] pub async fn dispose(&self) -> napi::Result<()> { - if self.disposed_flag.load(Ordering::Acquire) { - return Ok(()); - } - let inner = self.inner.clone(); - let disposed = self.disposed_flag.clone(); - tokio::task::spawn_blocking(move || { - let _ = inner.lock().map_err(|_| lock_error())?.take(); - disposed.store(true, Ordering::Release); - Ok(()) - }) - .await - .map_err(join_error)? + self.take_inner_with(async |_| Ok(())).await?; + Ok(()) } }