Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 152 additions & 107 deletions src/js-host-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi::{tokio, Status};
use napi_derive::napi;
use serde_json::Value as JsonValue;
use tokio::sync::oneshot;
use tokio::sync::{oneshot, Mutex as AsyncMutex, OwnedMappedMutexGuard, OwnedMutexGuard};

// ── napi-rs wrapper architecture ──────────────────────────────────────
//
Expand All @@ -47,6 +47,20 @@ use tokio::sync::oneshot;
// `spawn_blocking` closure on a background thread — we clone the `Arc` so the
// wrapper struct remains valid on the JS side while the Rust side works.
//
// ## Mutex choice: std vs tokio
//
// Most wrappers (`SandboxBuilderWrapper`, `ProtoJSSandboxWrapper`,
// `JSSandboxWrapper`) use `std::sync::Mutex` since they only hold locks
// briefly for quick operations.
//
// `LoadedJSSandboxWrapper` uses `tokio::sync::Mutex` because `call_handler()`
// holds the lock for the **entire duration** of guest code execution
// (potentially seconds). Using an async mutex allows:
// - Async methods (`unload`, `dispose`) to `.lock().await` without blocking
// - Long-running methods (`call_handler`, `snapshot`, `restore`) to acquire the
// lock asynchronously, then move the `OwnedMutexGuard` into `spawn_blocking`
// for CPU-intensive guest execution
//
// ## Why `LoadedJSSandboxWrapper` stores fields outside the Mutex
//
// `call_handler()` holds the Mutex for the **entire duration** of
Expand Down Expand Up @@ -786,7 +800,7 @@ impl JSSandboxWrapper {
let interrupt = loaded_sandbox.interrupt_handle();
let poisoned_flag = Arc::new(AtomicBool::new(loaded_sandbox.poisoned()));
Ok(LoadedJSSandboxWrapper {
inner: Arc::new(Mutex::new(Some(loaded_sandbox))),
inner: Arc::new(AsyncMutex::new(Some(loaded_sandbox))),
interrupt,
poisoned_flag,
last_call_stats: Arc::new(ArcSwapOption::empty()),
Expand Down Expand Up @@ -836,7 +850,7 @@ impl JSSandboxWrapper {
/// ```
#[napi(js_name = "LoadedJSSandbox")]
pub struct LoadedJSSandboxWrapper {
inner: Arc<Mutex<Option<LoadedJSSandbox>>>,
inner: Arc<AsyncMutex<Option<LoadedJSSandbox>>>,

/// Stored **outside** the Mutex so callers can `kill()` a running handler.
///
Expand Down Expand Up @@ -875,6 +889,68 @@ pub struct LoadedJSSandboxWrapper {
disposed_flag: Arc<AtomicBool>,
}

type LoadedJSSandboxGuard = OwnedMappedMutexGuard<Option<LoadedJSSandbox>, LoadedJSSandbox>;

impl LoadedJSSandboxWrapper {
/// Borrow the inner value mutably via Mutex, or error if consumed.
async fn with_inner<R>(
&self,
f: impl AsyncFnOnce(LoadedJSSandboxGuard) -> napi::Result<R>,
) -> napi::Result<R> {
let sandbox = self.inner.clone().lock_owned().await;
let sandbox = OwnedMutexGuard::try_map(sandbox, Option::as_mut)
.map_err(|_| consumed_error("LoadedJSSandbox"))?;
f(sandbox).await
}

/// Borrow the inner value mutably via Mutex, or error if consumed.
/// The closure `f` will run using spawn_blocking, so it can perform long-running operations without
/// blocking the Node.js event loop. This is the main way to interact with the inner `LoadedJSSandbox`.
async fn with_blocking_inner<R: Send + 'static>(
&self,
f: impl FnOnce(LoadedJSSandboxGuard) -> napi::Result<R> + Send + 'static,
) -> napi::Result<R> {
self.with_inner(async move |sandbox| {
tokio::task::spawn_blocking(move || f(sandbox))
.await
.map_err(join_error)?
})
.await
}

/// Take ownership of the inner value, returning a consumed-state error if
/// this instance has already been used.
async fn take_inner_with<R>(
&self,
f: impl AsyncFnOnce(LoadedJSSandbox) -> napi::Result<R>,
) -> napi::Result<R> {
let sandbox = self
.inner
.lock()
.await
.take()
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;
self.disposed_flag.store(true, Ordering::Release);
f(sandbox).await
}

/// Take ownership of the inner value, returning a consumed-state error if
/// this instance has already been used.
/// The closure `f` will run using spawn_blocking, so it can perform long-running operations without
/// blocking the Node.js event loop. This is the main way to interact with the inner `LoadedJSSandbox`.
async fn take_blocking_inner_with<R: Send + 'static>(
&self,
f: impl FnOnce(LoadedJSSandbox) -> napi::Result<R> + Send + 'static,
) -> napi::Result<R> {
self.take_inner_with(async move |sandbox| {
tokio::task::spawn_blocking(move || f(sandbox))
.await
.map_err(join_error)?
})
.await
}
}

#[napi]
impl LoadedJSSandboxWrapper {
/// Invoke a handler function with the given event data, optionally
Expand Down Expand Up @@ -939,7 +1015,6 @@ impl LoadedJSSandboxWrapper {
)));
}

let inner = self.inner.clone();
let poisoned_flag = self.poisoned_flag.clone();
let last_call_stats_store = self.last_call_stats.clone();
let gc = options.gc;
Expand All @@ -950,68 +1025,64 @@ impl LoadedJSSandboxWrapper {
let event_json = serde_json::to_string(&event_data)
.map_err(|e| invalid_arg_error(&format!("Failed to serialize event: {e}")))?;

let result_json = tokio::task::spawn_blocking(move || {
let mut guard = inner.lock().map_err(|_| lock_error())?;
let sandbox = guard
.as_mut()
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;

// Dispatch to the appropriate Rust method based on whether
// any monitor timeouts are specified.
//
// The three `handle_event_with_monitor` arms look duplicated, but
// each constructs a different concrete monitor type (single or tuple).
// The sealed `MonitorSet` trait is not object-safe, so we can't
// erase the type behind a `dyn` — the match is structurally required.
let result = match (wall_clock_timeout_ms, cpu_timeout_ms) {
// No monitors — fast path
(None, None) => sandbox
.handle_event(handler_name, event_json, gc)
.map_err(to_napi_error),
// Both — tuple with OR semantics (recommended)
(Some(wall_ms), Some(cpu_ms)) => {
let monitor = (
WallClockMonitor::new(Duration::from_millis(wall_ms as u64))
.map_err(to_napi_error)?,
CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64))
.map_err(to_napi_error)?,
);
sandbox
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
.map_err(to_napi_error)
}
// Wall-clock only
(Some(wall_ms), None) => {
let monitor = WallClockMonitor::new(Duration::from_millis(wall_ms as u64))
.map_err(to_napi_error)?;
let result_json = self
.with_blocking_inner(move |mut sandbox| {
// Dispatch to the appropriate Rust method based on whether
// any monitor timeouts are specified.
//
// The three `handle_event_with_monitor` arms look duplicated, but
// each constructs a different concrete monitor type (single or tuple).
// The sealed `MonitorSet` trait is not object-safe, so we can't
// erase the type behind a `dyn` — the match is structurally required.
let result = match (wall_clock_timeout_ms, cpu_timeout_ms) {
// No monitors — fast path
(None, None) => sandbox
.handle_event(handler_name, event_json, gc)
.map_err(to_napi_error),
// Both — tuple with OR semantics (recommended)
(Some(wall_ms), Some(cpu_ms)) => {
let monitor = (
WallClockMonitor::new(Duration::from_millis(wall_ms as u64))
.map_err(to_napi_error)?,
CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64))
.map_err(to_napi_error)?,
);
sandbox
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
.map_err(to_napi_error)
}
// Wall-clock only
(Some(wall_ms), None) => {
let monitor = WallClockMonitor::new(Duration::from_millis(wall_ms as u64))
.map_err(to_napi_error)?;
sandbox
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
.map_err(to_napi_error)
}
// CPU only
(None, Some(cpu_ms)) => {
let monitor = CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64))
.map_err(to_napi_error)?;
sandbox
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
.map_err(to_napi_error)
}
};
// Update poisoned flag while we hold the lock — keeps the getter
// lock-free so it never blocks the Node.js event loop.
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);

// Copy execution stats while we still hold the lock.
last_call_stats_store.store(
sandbox
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
.map_err(to_napi_error)
}
// CPU only
(None, Some(cpu_ms)) => {
let monitor = CpuTimeMonitor::new(Duration::from_millis(cpu_ms as u64))
.map_err(to_napi_error)?;
sandbox
.handle_event_with_monitor(handler_name, event_json, &monitor, gc)
.map_err(to_napi_error)
}
};
// Update poisoned flag while we hold the lock — keeps the getter
// lock-free so it never blocks the Node.js event loop.
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
.last_call_stats()
.map(|s| Arc::new(CallStats::from(s))),
);

// Copy execution stats while we still hold the lock.
last_call_stats_store.store(
sandbox
.last_call_stats()
.map(|s| Arc::new(CallStats::from(s))),
);
result
})
.await?;

result
})
.await
.map_err(join_error)??;
// Parse the JSON string result back into a JS object
serde_json::from_str(&result_json).map_err(|e| {
hl_error(
Expand All @@ -1032,18 +1103,9 @@ impl LoadedJSSandboxWrapper {
/// @throws If already consumed
#[napi]
pub async fn unload(&self) -> napi::Result<JSSandboxWrapper> {
let inner = self.inner.clone();
let disposed = self.disposed_flag.clone();
let js_sandbox = tokio::task::spawn_blocking(move || {
let mut guard = inner.lock().map_err(|_| lock_error())?;
let loaded = guard
.take()
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;
disposed.store(true, Ordering::Release);
loaded.unload().map_err(to_napi_error)
})
.await
.map_err(join_error)??;
let js_sandbox = self
.take_blocking_inner_with(|sandbox| sandbox.unload().map_err(to_napi_error))
.await?;
Ok(JSSandboxWrapper {
inner: Arc::new(Mutex::new(Some(js_sandbox))),
})
Expand Down Expand Up @@ -1145,19 +1207,15 @@ impl LoadedJSSandboxWrapper {
/// @throws If already consumed
#[napi]
pub async fn snapshot(&self) -> napi::Result<SnapshotWrapper> {
let inner = self.inner.clone();
let poisoned_flag = self.poisoned_flag.clone();
let snapshot = tokio::task::spawn_blocking(move || {
let mut guard = inner.lock().map_err(|_| lock_error())?;
let sandbox = guard
.as_mut()
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;
let result = sandbox.snapshot().map_err(to_napi_error);
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
result
})
.await
.map_err(join_error)??;

let snapshot = self
.with_blocking_inner(move |mut sandbox| {
let result = sandbox.snapshot().map_err(to_napi_error);
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
result
})
.await?;
Ok(SnapshotWrapper { inner: snapshot })
}

Expand All @@ -1172,20 +1230,17 @@ impl LoadedJSSandboxWrapper {
/// @throws If the snapshot doesn't match this sandbox, or if consumed
#[napi]
pub async fn restore(&self, snapshot: &SnapshotWrapper) -> napi::Result<()> {
let inner = self.inner.clone();
let snap = snapshot.inner.clone();
let poisoned_flag = self.poisoned_flag.clone();
tokio::task::spawn_blocking(move || {
let mut guard = inner.lock().map_err(|_| lock_error())?;
let sandbox = guard
.as_mut()
.ok_or_else(|| consumed_error("LoadedJSSandbox"))?;

self.with_blocking_inner(move |mut sandbox| {
let result = sandbox.restore(snap).map_err(to_napi_error);
poisoned_flag.store(sandbox.poisoned(), Ordering::Release);
result
})
.await
.map_err(join_error)?
.await?;

Ok(())
}

/// Eagerly release the underlying sandbox resources.
Expand All @@ -1198,18 +1253,8 @@ impl LoadedJSSandboxWrapper {
/// Calling `dispose()` on an already-consumed sandbox is a no-op.
#[napi]
pub async fn dispose(&self) -> napi::Result<()> {
if self.disposed_flag.load(Ordering::Acquire) {
return Ok(());
}
let inner = self.inner.clone();
let disposed = self.disposed_flag.clone();
tokio::task::spawn_blocking(move || {
let _ = inner.lock().map_err(|_| lock_error())?.take();
disposed.store(true, Ordering::Release);
Ok(())
})
.await
.map_err(join_error)?
self.take_inner_with(async |_| Ok(())).await?;
Ok(())
}
}

Expand Down
Loading