singularity-forge/rust-engine/crates/engine/src/watch.rs

262 lines
8 KiB
Rust

//! Recursive filesystem watcher exposed to JavaScript via napi-rs.
//!
//! Purpose: keep high-volume filesystem notifications out of the JavaScript
//! event loop while preserving a small, debounced batch API for agent sessions.
use dashmap::DashMap;
use globset::{GlobBuilder, GlobSet, GlobSetBuilder};
use napi::bindgen_prelude::*;
use napi::threadsafe_function::{
ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
};
use napi_derive::napi;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{mpsc, Arc};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
static NEXT_HANDLE: AtomicU32 = AtomicU32::new(1);
static WATCHERS: std::sync::OnceLock<DashMap<u32, WatcherHandle>> = std::sync::OnceLock::new();
fn watchers() -> &'static DashMap<u32, WatcherHandle> {
WATCHERS.get_or_init(DashMap::new)
}
struct WatcherHandle {
_watcher: RecommendedWatcher,
stop: Arc<AtomicBool>,
thread: Option<JoinHandle<()>>,
}
impl Drop for WatcherHandle {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
}
#[napi(object)]
pub struct WatchOptions {
/// Glob patterns to ignore. Bare patterns match anywhere below the root.
pub ignore: Option<Vec<String>>,
/// Coalesce events that fire within this window in milliseconds. Default 50.
#[napi(js_name = "debounceMs")]
pub debounce_ms: Option<u32>,
/// Watch recursively. Default true.
pub recursive: Option<bool>,
}
#[napi(object)]
#[derive(Clone)]
pub struct WatchEvent {
/// "create" | "modify" | "remove" | "rename"
pub kind: String,
/// Absolute path of the affected entry.
pub path: String,
}
fn build_ignore_set(patterns: &[String]) -> std::result::Result<GlobSet, String> {
let mut builder = GlobSetBuilder::new();
for pattern in patterns {
let normalized = pattern.replace('\\', "/");
let full = if !normalized.contains('/') && !normalized.starts_with("**") {
format!("**/{normalized}")
} else {
normalized
};
builder.add(
GlobBuilder::new(&full)
.literal_separator(true)
.build()
.map_err(|e| format!("invalid ignore pattern '{pattern}': {e}"))?,
);
}
builder
.build()
.map_err(|e| format!("failed to build ignore set: {e}"))
}
fn event_kind(kind: &EventKind) -> Option<&'static str> {
use notify::event::ModifyKind;
use notify::EventKind::*;
match kind {
Create(_) => Some("create"),
Remove(_) => Some("remove"),
Modify(ModifyKind::Name(_)) => Some("rename"),
Modify(_) => Some("modify"),
_ => None,
}
}
fn path_is_ignored(path: &Path, root: &Path, ignore_set: &GlobSet, has_ignores: bool) -> bool {
if !has_ignores {
return false;
}
path.strip_prefix(root)
.ok()
.is_some_and(|relative| ignore_set.is_match(relative))
}
fn drain_batch(
receiver: &mpsc::Receiver<notify::Result<Event>>,
first: notify::Result<Event>,
debounce: Duration,
) -> Vec<notify::Result<Event>> {
let deadline = Instant::now() + debounce;
let mut events = vec![first];
loop {
let now = Instant::now();
if now >= deadline {
break;
}
match receiver.recv_timeout(deadline - now) {
Ok(event) => events.push(event),
Err(mpsc::RecvTimeoutError::Timeout) => break,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
events
}
fn convert_batch(
root: &Path,
ignore_set: &GlobSet,
has_ignores: bool,
raw_events: Vec<notify::Result<Event>>,
) -> Vec<WatchEvent> {
let mut batch = Vec::new();
for raw in raw_events {
let event = match raw {
Ok(event) => event,
Err(_) => continue,
};
let Some(kind) = event_kind(&event.kind) else {
continue;
};
for path in event.paths {
if path_is_ignored(&path, root, ignore_set, has_ignores) {
continue;
}
batch.push(WatchEvent {
kind: kind.to_string(),
path: path.to_string_lossy().into_owned(),
});
}
}
batch
}
/// Start watching a directory tree and return a numeric handle for `stopWatch`.
#[napi(
js_name = "watchTree",
ts_args_type = "root: string, options: WatchOptions | undefined | null, onEvents: (events: WatchEvent[]) => void"
)]
pub fn watch_tree(
_env: Env,
root: String,
options: Option<WatchOptions>,
on_events: JsFunction,
) -> Result<u32> {
let opts = options.unwrap_or(WatchOptions {
ignore: None,
debounce_ms: None,
recursive: None,
});
let debounce = Duration::from_millis(u64::from(opts.debounce_ms.unwrap_or(50)));
let recursive = opts.recursive.unwrap_or(true);
let root_path = PathBuf::from(&root);
if !root_path.exists() {
return Err(Error::new(
Status::InvalidArg,
format!("watch root does not exist: {root}"),
));
}
let ignore_patterns = opts.ignore.unwrap_or_default();
let ignore_set =
build_ignore_set(&ignore_patterns).map_err(|e| Error::new(Status::InvalidArg, e))?;
let has_ignores = !ignore_patterns.is_empty();
let tsfn: ThreadsafeFunction<Vec<WatchEvent>> = on_events.create_threadsafe_function(
0,
|ctx: ThreadSafeCallContext<Vec<WatchEvent>>| {
let events: Vec<WatchEvent> = ctx.value;
let env = ctx.env;
let mut arr = env.create_array_with_length(events.len())?;
for (i, event) in events.into_iter().enumerate() {
let mut obj = env.create_object()?;
obj.set_named_property("kind", env.create_string(&event.kind)?)?;
obj.set_named_property("path", env.create_string(&event.path)?)?;
arr.set_element(i as u32, obj)?;
}
Ok(vec![arr])
},
)?;
let (sender, receiver) = mpsc::channel();
let mut watcher = RecommendedWatcher::new(
move |result| {
let _ = sender.send(result);
},
Config::default(),
)
.map_err(|e| {
Error::new(
Status::GenericFailure,
format!("failed to create watcher: {e}"),
)
})?;
let mode = if recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
watcher.watch(&root_path, mode).map_err(|e| {
Error::new(
Status::GenericFailure,
format!("failed to watch '{root}': {e}"),
)
})?;
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = Arc::clone(&stop);
let root_thread = root_path.clone();
let tsfn_thread = tsfn.clone();
let thread = thread::spawn(move || {
while !stop_thread.load(Ordering::Relaxed) {
let first = match receiver.recv_timeout(Duration::from_millis(100)) {
Ok(event) => event,
Err(mpsc::RecvTimeoutError::Timeout) => continue,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
let raw = drain_batch(&receiver, first, debounce);
let batch = convert_batch(&root_thread, &ignore_set, has_ignores, raw);
if !batch.is_empty() {
tsfn_thread.call(Ok(batch), ThreadsafeFunctionCallMode::NonBlocking);
}
}
});
let handle = NEXT_HANDLE.fetch_add(1, Ordering::Relaxed);
watchers().insert(
handle,
WatcherHandle {
_watcher: watcher,
stop,
thread: Some(thread),
},
);
Ok(handle)
}
/// Stop a watcher returned by `watchTree`.
#[napi(js_name = "stopWatch")]
pub fn stop_watch(handle: u32) -> Result<bool> {
Ok(watchers().remove(&handle).is_some())
}