feat(core): rust based watcher (#16915)

This commit is contained in:
Jonathan Cammisuli 2023-06-01 14:00:27 -04:00 committed by GitHub
parent f5e52dbcbf
commit a978ad3094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 2361 additions and 80 deletions

View File

@ -126,6 +126,7 @@ jobs:
SELECTED_PM: << parameters.pm >>
NX_E2E_RUN_CYPRESS: 'true'
NX_VERBOSE_LOGGING: 'false'
NX_NATIVE_LOGGING: 'false'
NX_PERF_LOGGING: 'false'
steps:
- run:
@ -154,6 +155,7 @@ jobs:
NX_VERBOSE_LOGGING: 'false'
NX_DAEMON: 'true'
NX_PERF_LOGGING: 'false'
NX_NATIVE_LOGGING: 'false'
steps:
- run:
name: Set dynamic nx run variable

1575
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,8 @@
[workspace]
members = ['packages/nx']
members = [
'packages/nx'
]
[profile.release]
lto = true

View File

@ -6,6 +6,7 @@
"tasksRunnerOptions": {
"default": {
"runner": "nx-cloud",
"nativeWatcher": true,
"options": {
"accessToken": "NDg1NTA3MTAtOGFmZC00YmIwLTk2Y2MtOTkzNzc4ZTczYTlkfHJlYWQtb25seQ==",
"cacheableOperations": [

View File

@ -5,11 +5,23 @@ edition = '2021'
[dependencies]
xxhash-rust = { version = '0.8.5', features = ['xxh3', 'xxh64'] }
napi = { version = '2.10.2', default-features = false, features = ['napi4'] }
napi = { version = '2.12.6', default-features = false, features = ['anyhow', 'napi4', 'tokio_rt'] }
napi-derive = '2.9.3'
ignore = '0.4'
crossbeam-channel = '0.5'
ignore-files = "1.3.0"
watchexec = "2.3.0"
watchexec-filterer-ignore = "1.2.1"
watchexec-events = "1.0.0"
watchexec-signals = "1.0.0"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"]}
anyhow = "1.0.71"
itertools = "0.10.5"
rayon = "1.7.0"
[lib]
crate-type = ['cdylib']

View File

@ -12,6 +12,11 @@
"dist": "packages/nx/src/native",
"jsFile": "packages/nx/src/native/index.js",
"release": true
},
"configurations": {
"local": {
"release": false
}
}
},
"artifacts": {

View File

@ -96,6 +96,10 @@ export interface NxJsonConfiguration<T = '*' | string[]> {
* Default options for the runner
*/
options?: any;
/**
* Enables the Rust watcher within the daemon
*/
nativeWatcher?: boolean;
};
};
/**

View File

@ -339,6 +339,10 @@ export class DaemonClient {
this._out = await open(DAEMON_OUTPUT_LOG_FILE, 'a');
this._err = await open(DAEMON_OUTPUT_LOG_FILE, 'a');
if (this.nxJson.tasksRunnerOptions.default?.nativeWatcher) {
DAEMON_ENV_SETTINGS['NX_NATIVE_WATCHER'] = true;
}
const backgroundProcess = spawn(
process.execPath,
[join(__dirname, '../server/start.js')],
@ -352,7 +356,6 @@ export class DaemonClient {
}
);
backgroundProcess.unref();
//
/**
* Ensure the server is actually available to connect to via IPC before resolving

View File

@ -10,15 +10,19 @@ import {
import { serverLogger } from './logger';
import {
getOutputsWatcherSubscription,
getOutputWatcherInstance,
getSourceWatcherSubscription,
getWatcherInstance,
handleServerProcessTermination,
resetInactivityTimeout,
respondToClient,
respondWithErrorAndExit,
SERVER_INACTIVITY_TIMEOUT_MS,
storeOutputsWatcherSubscription,
storeOutputWatcherInstance,
storeProcessJsonSubscription,
storeSourceWatcherSubscription,
storeWatcherInstance,
} from './shutdown-utils';
import {
convertChangeEventsToLogMessage,
@ -26,6 +30,8 @@ import {
subscribeToWorkspaceChanges,
FileWatcherCallback,
subscribeToServerProcessJsonChanges,
watchWorkspace,
watchOutputFiles,
} from './watcher';
import { addUpdatedAndDeletedFiles } from './project-graph-incremental-recomputation';
import { existsSync, statSync } from 'fs';
@ -232,6 +238,10 @@ function registerProcessTerminationListeners() {
}
async function registerProcessServerJsonTracking() {
if (useNativeWatcher()) {
return;
}
storeProcessJsonSubscription(
await subscribeToServerProcessJsonChanges(async () => {
if (getDaemonProcessIdSync() !== process.pid) {
@ -312,12 +322,13 @@ const handleWorkspaceChanges: FileWatcherCallback = async (
}
if (err || !changeEvents || !changeEvents.length) {
let error = typeof err === 'string' ? new Error(err) : err;
serverLogger.watcherLog(
'Unexpected workspace watcher error',
err.message
error.message
);
console.error(err);
workspaceWatcherError = err;
console.error(error);
workspaceWatcherError = error;
return;
}
@ -361,15 +372,21 @@ const handleWorkspaceChanges: FileWatcherCallback = async (
const handleOutputsChanges: FileWatcherCallback = async (err, changeEvents) => {
try {
if (err || !changeEvents || !changeEvents.length) {
serverLogger.watcherLog('Unexpected outputs watcher error', err.message);
console.error(err);
outputsWatcherError = err;
let error = typeof err === 'string' ? new Error(err) : err;
serverLogger.watcherLog(
'Unexpected outputs watcher error',
error.message
);
console.error(error);
outputsWatcherError = error;
disableOutputsTracking();
return;
}
if (outputsWatcherError) {
return;
}
serverLogger.watcherLog('Processing file changes in outputs');
processFileChangesInOutputs(changeEvents);
} catch (err) {
serverLogger.watcherLog(`Unexpected outputs watcher error`, err.message);
@ -398,9 +415,29 @@ export async function startServer(): Promise<Server> {
// this triggers the storage of the lock file hash
daemonIsOutdated();
if (useNativeWatcher()) {
if (!getWatcherInstance()) {
storeWatcherInstance(
await watchWorkspace(server, handleWorkspaceChanges)
);
serverLogger.watcherLog(
`Subscribed to changes within: ${workspaceRoot} (native)`
);
}
if (!getOutputWatcherInstance()) {
storeOutputWatcherInstance(
await watchOutputFiles(handleOutputsChanges)
);
}
} else {
if (!getSourceWatcherSubscription()) {
storeSourceWatcherSubscription(
await subscribeToWorkspaceChanges(server, handleWorkspaceChanges)
await subscribeToWorkspaceChanges(
server,
handleWorkspaceChanges
)
);
serverLogger.watcherLog(
`Subscribed to changes within: ${workspaceRoot}`
@ -418,6 +455,7 @@ export async function startServer(): Promise<Server> {
} else {
disableOutputsTracking();
}
}
return resolve(server);
} catch (err) {
@ -429,3 +467,7 @@ export async function startServer(): Promise<Server> {
}
});
}
function useNativeWatcher() {
return process.env.NX_NATIVE_WATCHER === 'true';
}

View File

@ -4,6 +4,7 @@ import { serverLogger } from './logger';
import { serializeResult } from '../socket-utils';
import type { AsyncSubscription } from '@parcel/watcher';
import { deleteDaemonJsonProcessCache } from '../cache';
import type { Watcher } from '../../native';
export const SERVER_INACTIVITY_TIMEOUT_MS = 10800000 as const; // 10800000 ms = 3 hours
@ -32,6 +33,22 @@ export function storeProcessJsonSubscription(s: AsyncSubscription) {
processJsonSubscription = s;
}
let watcherInstance: Watcher | undefined;
export function storeWatcherInstance(instance: Watcher) {
watcherInstance = instance;
}
export function getWatcherInstance() {
return watcherInstance;
}
let outputWatcherInstance: Watcher | undefined;
export function storeOutputWatcherInstance(instance: Watcher) {
outputWatcherInstance = instance;
}
export function getOutputWatcherInstance() {
return outputWatcherInstance;
}
interface HandleServerProcessTerminationParams {
server: Server;
reason: string;
@ -62,6 +79,21 @@ export async function handleServerProcessTermination({
`Unsubscribed from changes within: ${workspaceRoot} (server-process.json)`
);
}
if (watcherInstance) {
await watcherInstance.stop();
serverLogger.watcherLog(
`Stopping the watcher for ${workspaceRoot} (sources)`
);
}
if (outputWatcherInstance) {
await outputWatcherInstance.stop();
serverLogger.watcherLog(
`Stopping the watcher for ${workspaceRoot} (outputs)`
);
}
serverLogger.log(`Server stopped because: "${reason}"`);
} finally {
process.exit(0);

View File

@ -18,13 +18,14 @@ import {
getIgnoreObject,
} from '../../utils/ignore';
import { platform } from 'os';
import { serverProcessJsonPath } from '../cache';
import { getDaemonProcessIdSync, serverProcessJsonPath } from '../cache';
import type { WatchEvent } from '../../native';
const ALWAYS_IGNORE = [...getAlwaysIgnore(workspaceRoot), FULL_OS_SOCKET_PATH];
export type FileWatcherCallback = (
err: Error | null,
changeEvents: Event[] | null
err: Error | string | null,
changeEvents: Event[] | WatchEvent[] | null
) => Promise<void>;
export async function subscribeToOutputsChanges(
@ -52,6 +53,69 @@ export async function subscribeToOutputsChanges(
);
}
export async function watchWorkspace(server: Server, cb: FileWatcherCallback) {
const { Watcher } = await import('../../native');
let relativeServerProcess = normalizePath(
relative(workspaceRoot, serverProcessJsonPath)
);
let watcher = new Watcher(workspaceRoot, [`!${relativeServerProcess}`]);
watcher.watch((err, events) => {
if (err) {
return cb(err, null);
}
for (const event of events) {
if (
event.path == relativeServerProcess &&
getDaemonProcessIdSync() !== process.pid
) {
handleServerProcessTermination({
server,
reason: 'this process is no longer the current daemon (native)',
});
}
if (event.path.endsWith('.gitignore') || event.path === '.nxignore') {
// If the ignore files themselves have changed we need to dynamically update our cached ignoreGlobs
handleServerProcessTermination({
server,
reason:
'Stopping the daemon the set of ignored files changed (native)',
});
}
}
cb(null, events);
});
return watcher;
}
export async function watchOutputFiles(cb: FileWatcherCallback) {
const { Watcher } = await import('../../native');
let watcher = new Watcher(workspaceRoot, null, false);
watcher.watch((err, events) => {
if (err) {
return cb(err, null);
}
for (const event of events) {
if (
event.path.startsWith('.git') ||
event.path.includes('node_modules')
) {
return;
}
}
cb(null, events);
});
return watcher;
}
export async function subscribeToWorkspaceChanges(
server: Server,
cb: FileWatcherCallback

View File

@ -10,3 +10,25 @@ export interface FileData {
export function hashArray(input: Array<string>): string
export function hashFile(file: string): FileData | null
export function hashFiles(workspaceRoot: string): Record<string, string>
/**
* Newly created files will have the `update` EventType as well.
* This simplifies logic between OS's, IDEs and git operations
*/
export const enum EventType {
delete = 'delete',
update = 'update'
}
export interface WatchEvent {
path: string
type: EventType
}
export class Watcher {
origin: string
/**
* Creates a new Watcher instance.
* If `useIgnore` is set to false, no ignores will be used, even when `additionalGlobs` is set
*/
constructor(origin: string, additionalGlobs?: Array<string> | undefined | null, useIgnore?: boolean | undefined | null)
watch(callback: (err: string | null, events: WatchEvent[]) => void): void
stop(): Promise<void>
}

View File

@ -246,8 +246,10 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
const { hashArray, hashFile, hashFiles } = nativeBinding
const { hashArray, hashFile, hashFiles, EventType, Watcher } = nativeBinding
module.exports.hashArray = hashArray
module.exports.hashFile = hashFile
module.exports.hashFiles = hashFiles
module.exports.EventType = EventType
module.exports.Watcher = Watcher

View File

@ -1 +1,2 @@
pub mod native_hasher;
pub mod watch;

View File

@ -19,7 +19,7 @@ pub struct FileData {
fn hash_array(input: Vec<String>) -> String {
let joined = input.join(",");
let content = joined.as_bytes();
return xxh3::xxh3_64(content).to_string();
xxh3::xxh3_64(content).to_string()
}
#[napi]
@ -40,7 +40,7 @@ fn hash_files(workspace_root: String) -> HashMap<String, String> {
let git_folder = workspace_root.join(".git");
let node_folder = workspace_root.join("node_modules");
let mut walker = WalkBuilder::new(&workspace_root);
let mut walker = WalkBuilder::new(workspace_root);
walker.hidden(false);
walker.add_custom_ignore_filename(&nx_ignore);

View File

@ -1,8 +1,9 @@
import { hashFile, hashArray } from '../index';
import { hashArray, hashFile, Watcher } from '../index';
import { tmpdir } from 'os';
import { mkdtemp, writeFile } from 'fs-extra';
import { mkdtemp, realpathSync, writeFile } from 'fs-extra';
import { join } from 'path';
import { TempFs } from '../../utils/testing/temp-fs';
describe('native', () => {
it('should hash files', async () => {
@ -18,7 +19,7 @@ describe('native', () => {
it('should hash content', async () => {
expect(hashArray).toBeDefined();
expect(hashArray(["one", "two"])).toEqual("10960201262927338690")
expect(hashArray(['one', 'two'])).toEqual('10960201262927338690');
});
it('should create an instance of NativeHasher', () => {
@ -26,3 +27,142 @@ describe('native', () => {
// expect(nativeHasher instanceof NativeFileHasher).toBe(true);
});
});
describe('watcher', () => {
let temp: TempFs;
let watcher: Watcher;
beforeEach(() => {
temp = new TempFs('watch-dir');
temp.createFilesSync({
'.gitignore': 'node_modules/',
'.nxignore': 'app2/',
'app1/main.js': '',
'app1/main.css': '',
'app2/main.js': '',
'nested-ignore/.gitignore': '*',
'nested-ignore/file.js': '',
'node_modules/module/index.js': '',
});
console.log(`watching ${temp.tempDir}`);
});
afterEach(() => {
watcher.stop();
temp.cleanup();
});
it('should trigger the callback for files that are not ignored', (done) => {
watcher = new Watcher(realpathSync(temp.tempDir));
watcher.watch((error, paths) => {
expect(paths).toMatchInlineSnapshot(`
[
{
"path": "app1/main.html",
"type": "update",
},
]
`);
done();
});
wait().then(() => {
temp.createFileSync('node_modules/my-file.json', JSON.stringify({}));
temp.createFileSync('app2/main.css', JSON.stringify({}));
temp.createFileSync('app1/main.html', JSON.stringify({}));
});
});
it('should trigger the callback when files are updated', (done) => {
watcher = new Watcher(realpathSync(temp.tempDir));
watcher.watch((err, paths) => {
expect(paths).toMatchInlineSnapshot(`
[
{
"path": "app1/main.js",
"type": "update",
},
]
`);
done();
});
wait().then(() => {
// nxignored file should not trigger a callback
temp.appendFile('app2/main.js', 'update');
temp.appendFile('app1/main.js', 'update');
});
});
it('should watch file renames', (done) => {
watcher = new Watcher(realpathSync(temp.tempDir));
watcher.watch((err, paths) => {
expect(paths.length).toBe(2);
expect(paths.find((p) => p.type === 'update')).toMatchObject({
path: 'app1/rename.js',
type: 'update',
});
expect(paths.find((p) => p.type === 'delete')).toMatchObject({
path: 'app1/main.js',
type: 'delete',
});
done();
});
wait().then(() => {
temp.renameFile('app1/main.js', 'app1/rename.js');
});
});
it('should trigger on deletes', (done) => {
watcher = new Watcher(realpathSync(temp.tempDir));
watcher.watch((err, paths) => {
expect(paths).toMatchInlineSnapshot(`
[
{
"path": "app1/main.js",
"type": "delete",
},
]
`);
done();
});
wait().then(() => {
temp.removeFileSync('app1/main.js');
});
});
it('should ignore nested gitignores', (done) => {
watcher = new Watcher(realpathSync(temp.tempDir));
watcher.watch((err, paths) => {
expect(paths).toMatchInlineSnapshot(`
[
{
"path": "boo.txt",
"type": "update",
},
]
`);
done();
});
wait().then(() => {
// should not be triggered
temp.createFileSync('nested-ignore/hello1.txt', '');
temp.createFileSync('boo.txt', '');
});
});
});
function wait() {
return new Promise<void>((res) => {
setTimeout(() => {
res();
}, 500);
});
}

View File

@ -0,0 +1,5 @@
mod types;
mod utils;
mod watch_config;
mod watch_filterer;
mod watcher;

View File

@ -0,0 +1,78 @@
use napi::bindgen_prelude::*;
use std::path::PathBuf;
use tracing::trace;
use watchexec_events::{Event, Tag};
#[napi(string_enum)]
#[derive(Debug)]
/// Newly created files will have the `update` EventType as well.
/// This simplifies logic between OS's, IDEs and git operations
pub enum EventType {
#[allow(non_camel_case_types)]
delete,
#[allow(non_camel_case_types)]
update,
}
#[derive(Debug, Clone)]
#[napi(object)]
pub struct WatchEvent {
pub path: String,
pub r#type: EventType,
}
impl From<WatchEventInternal> for WatchEvent {
fn from(value: WatchEventInternal) -> Self {
let path = value
.path
.strip_prefix(&value.origin.expect("origin is available"))
.unwrap_or(&value.path)
.display()
.to_string();
#[cfg(windows)]
let path = path.replace('\\', "/");
WatchEvent {
path,
r#type: value.r#type,
}
}
}
#[derive(Debug, Clone)]
pub(super) struct WatchEventInternal {
pub path: PathBuf,
pub r#type: EventType,
pub origin: Option<String>,
}
impl From<&Event> for WatchEventInternal {
fn from(value: &Event) -> Self {
let path = value.paths().next().expect("there should always be a path");
let event_kind = value
.tags
.iter()
.find_map(|t| match t {
Tag::FileEventKind(event_kind) => Some(event_kind),
_ => None,
})
.expect("there should always be a file event kind");
let path_ref = path.0;
let event_type = if matches!(path.1, None) && !path_ref.exists() {
EventType::delete
} else {
EventType::update
};
trace!(?path, ?event_kind, ?event_type, "event kind -> event type");
WatchEventInternal {
path: path.0.into(),
r#type: event_type,
origin: None,
}
}
}

View File

@ -0,0 +1,47 @@
use ignore::WalkBuilder;
use ignore_files::IgnoreFile;
use std::path::PathBuf;
pub(super) fn get_ignore_files<T: AsRef<str>>(root: T) -> Vec<IgnoreFile> {
let root = root.as_ref();
let mut walker = WalkBuilder::new(root);
walker.hidden(false);
walker.git_ignore(false);
let node_folder = PathBuf::from(root).join("node_modules");
walker.filter_entry(move |entry| !entry.path().starts_with(&node_folder));
walker
.build()
.flatten()
.filter(|result| {
result.path().ends_with(".nxignore") || result.path().ends_with(".gitignore")
})
.map(|result| {
let path: PathBuf = result.path().into();
let parent: PathBuf = path.parent().unwrap_or(&path).into();
IgnoreFile {
path,
applies_in: Some(parent),
applies_to: None,
}
})
.collect()
}
// /// Get only the root level folders to watch.
// /// These will not include git ignored folders
// pub(super) fn get_watch_directories<T: AsRef<str>>(root: T) -> Vec<PathBuf> {
// let root = root.as_ref();
//
// let mut walker = WalkBuilder::new(root);
// walker.hidden(false);
// walker.max_depth(Some(1));
// walker.filter_entry(|entry| entry.path().is_dir());
//
// walker
// .build()
// .flatten()
// .map(|result| result.path().into())
// .collect()
// }

View File

@ -0,0 +1,45 @@
use crate::native::watch::utils::get_ignore_files;
use crate::native::watch::watch_filterer::WatchFilterer;
use ignore_files::IgnoreFilter;
use std::sync::Arc;
use std::time::Duration;
use tracing::trace;
use watchexec::config::RuntimeConfig;
use watchexec_filterer_ignore::IgnoreFilterer;
pub(super) async fn create_runtime(
origin: &str,
additional_globs: &[&str],
use_ignore: bool,
) -> napi::Result<RuntimeConfig> {
let ignore_files = if use_ignore {
get_ignore_files(origin)
} else {
vec![]
};
trace!(
?use_ignore,
?additional_globs,
?ignore_files,
"Using these ignore files for the watcher"
);
let mut filter = IgnoreFilter::new(origin, &ignore_files)
.await
.map_err(anyhow::Error::from)?;
filter
.add_globs(&additional_globs, Some(&origin.into()))
.map_err(anyhow::Error::from)?;
let mut runtime = RuntimeConfig::default();
runtime.filterer(Arc::new(WatchFilterer {
inner: IgnoreFilterer(filter),
}));
runtime.action_throttle(Duration::from_millis(500));
// let watch_directories = get_watch_directories(origin);
// trace!(directories = ?watch_directories, "watching");
runtime.pathset([&origin]);
Ok(runtime)
}

View File

@ -0,0 +1,64 @@
use tracing::trace;
use watchexec::error::RuntimeError;
use watchexec::filter::Filterer;
use watchexec_events::filekind::{CreateKind, FileEventKind, ModifyKind, RemoveKind};
use watchexec_events::{Event, FileType, Priority, Source, Tag};
use watchexec_filterer_ignore::IgnoreFilterer;
#[derive(Debug)]
pub struct WatchFilterer {
pub inner: IgnoreFilterer,
}
/// Used to filter out events that that come from watchexec
impl Filterer for WatchFilterer {
fn check_event(&self, event: &Event, priority: Priority) -> Result<bool, RuntimeError> {
if !self.inner.check_event(event, priority)? {
return Ok(false);
}
trace!(?event, "checking if event is valid");
//
// Tags will be a Vec that contains multiple types of information for a given event
// We are only interested if:
// 1) A `FileEventKind` is modified, created, removed, or renamed
// 2) A Path that is a FileType::File
// 3) Deleted files do not have a FileType::File (because they're deleted..), check if a path is valid
// 4) Only FileSystem sources are valid
// If there's a tag that doesnt confine to this criteria, we `return` early, otherwise we `continue`.
for tag in &event.tags {
match tag {
// Tag::Source(Source::Keyboard) => continue,
// Tag::Keyboard(Keyboard::Eof) => continue,
Tag::FileEventKind(file_event) => match file_event {
FileEventKind::Modify(ModifyKind::Name(_)) => continue,
FileEventKind::Modify(ModifyKind::Data(_)) => continue,
FileEventKind::Create(CreateKind::File) => continue,
FileEventKind::Remove(RemoveKind::File) => continue,
#[cfg(windows)]
FileEventKind::Modify(ModifyKind::Any) => continue,
#[cfg(windows)]
FileEventKind::Create(CreateKind::Any) => continue,
#[cfg(windows)]
FileEventKind::Remove(RemoveKind::Any) => continue,
_ => return Ok(false),
},
// Deleted files do not have a file_type + we don't want directory changes + we dont want files that end with `~`
Tag::Path {
path,
file_type: Some(FileType::File) | None,
} if !path.display().to_string().ends_with('~') => continue,
Tag::Source(Source::Filesystem) => continue,
_ => return Ok(false),
}
}
trace!(?event, "event passed all checks");
Ok(true)
}
}

View File

@ -0,0 +1,203 @@
use std::collections::HashMap;
use std::convert::Infallible;
use std::path::MAIN_SEPARATOR;
use std::sync::Arc;
use crate::native::watch::types::{WatchEvent, WatchEventInternal};
use itertools::Itertools;
use napi::bindgen_prelude::*;
use napi::threadsafe_function::{
ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
};
use napi::{Env, JsFunction, JsObject};
use rayon::prelude::*;
use tracing::trace;
use tracing_subscriber::EnvFilter;
use watchexec::action::{Action, Outcome};
use watchexec::config::{InitConfig, RuntimeConfig};
use watchexec::event::Tag;
use watchexec::Watchexec;
use watchexec_events::{Event, Keyboard, Priority};
use watchexec_signals::Signal;
use crate::native::watch::watch_config;
#[napi]
pub struct Watcher {
pub origin: String,
watch_exec: Arc<Watchexec>,
additional_globs: Vec<String>,
use_ignore: bool,
}
#[napi]
impl Watcher {
/// Creates a new Watcher instance.
/// If `useIgnore` is set to false, no ignores will be used, even when `additionalGlobs` is set
#[napi(constructor)]
pub fn new(
origin: String,
additional_globs: Option<Vec<String>>,
use_ignore: Option<bool>,
) -> Result<Watcher> {
let watch_exec = Watchexec::new(InitConfig::default(), RuntimeConfig::default())
.map_err(anyhow::Error::from)?;
let mut globs = if let Some(globs) = additional_globs {
globs
} else {
vec![]
};
// always ignore the .git and node_modules folder
globs.push(".git/".into());
globs.push("node_modules/".into());
Ok(Watcher {
origin,
watch_exec,
additional_globs: globs,
use_ignore: use_ignore.unwrap_or(true),
})
}
#[napi]
pub fn watch(
&mut self,
env: Env,
#[napi(ts_arg_type = "(err: string | null, events: WatchEvent[]) => void")]
callback: JsFunction,
) -> Result<()> {
_ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_env("NX_NATIVE_LOGGING"))
.try_init();
let mut callback_tsfn: ThreadsafeFunction<HashMap<String, Vec<WatchEventInternal>>> =
callback.create_threadsafe_function(
0,
|ctx: ThreadSafeCallContext<HashMap<String, Vec<WatchEventInternal>>>| {
let mut watch_events: Vec<WatchEvent> = vec![];
trace!(?ctx.value, "Base collection that will be sent");
for (_, value) in ctx.value {
let event = value
.first()
.expect("should always have at least 1 element")
.to_owned();
watch_events.push(event.into());
}
trace!(?watch_events, "sending to node");
Ok(vec![watch_events])
},
)?;
callback_tsfn.unref(&env)?;
let origin = self.origin.clone();
let watch_exec = self.watch_exec.clone();
let additional_globs = self.additional_globs.clone();
let use_ignore = self.use_ignore.clone();
let start = async move {
let mut runtime = watch_config::create_runtime(
&origin,
&additional_globs
.iter()
.map(String::as_ref)
.collect::<Vec<_>>(),
use_ignore,
)
.await?;
runtime.on_action(move |action: Action| {
let ok_future = async { Ok::<(), Infallible>(()) };
let signals: Vec<Signal> = action.events.iter().flat_map(Event::signals).collect();
if signals.contains(&Signal::Terminate) {
trace!("terminate - ending watch");
action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
return ok_future;
}
if signals.contains(&Signal::Interrupt) {
trace!("interrupt - ending watch");
action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
return ok_future;
}
let is_keyboard_eof = action
.events
.iter()
.any(|e| e.tags.contains(&Tag::Keyboard(Keyboard::Eof)));
if is_keyboard_eof {
trace!("ending watch");
action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
return ok_future;
}
let mut origin_path = origin.clone();
if !origin_path.ends_with(MAIN_SEPARATOR) {
origin_path.push(MAIN_SEPARATOR);
}
trace!(?origin_path);
let events = action
.events
.par_iter()
.map(|ev| {
let mut watch_event: WatchEventInternal = ev.into();
watch_event.origin = Some(origin_path.clone());
watch_event
})
.collect::<Vec<WatchEventInternal>>();
let group_events = events
.into_iter()
.into_group_map_by(|g| g.path.display().to_string());
callback_tsfn.call(Ok(group_events), ThreadsafeFunctionCallMode::NonBlocking);
action.outcome(Outcome::Start);
ok_future
});
trace!("configuring watch exec");
watch_exec
.reconfigure(runtime)
.map_err(anyhow::Error::from)?;
trace!("starting watch exec");
watch_exec.main().await.map_err(anyhow::Error::from)?.ok();
Ok(())
};
env.spawn_future(start)?;
trace!("started watch exec");
Ok(())
}
#[napi(ts_return_type = "Promise<void>")]
pub fn stop(&mut self, env: Env) -> Result<JsObject> {
trace!("stopping the watch process");
let watch_exec = self.watch_exec.clone();
let send_terminate = async move {
watch_exec
.send_event(
Event {
tags: vec![Tag::Signal(Signal::Terminate)],
metadata: HashMap::new(),
},
Priority::Urgent,
)
.await
.map_err(anyhow::Error::from)?;
Ok(())
};
env.spawn_future(send_terminate)
}
}

View File

@ -6,9 +6,11 @@ import {
outputFile,
rmSync,
emptyDirSync,
outputFileSync,
unlinkSync,
} from 'fs-extra';
import { joinPathFragments } from '../path';
import { appendFileSync, writeFileSync } from 'fs';
import { appendFileSync, writeFileSync, renameSync } from 'fs';
type NestedFiles = {
[fileName: string]: string;
@ -31,14 +33,28 @@ export class TempFs {
);
}
createFilesSync(fileObject: NestedFiles) {
for (let path of Object.keys(fileObject)) {
this.createFileSync(path, fileObject[path]);
}
}
async createFile(filePath: string, content: string) {
await outputFile(joinPathFragments(this.tempDir, filePath), content);
}
createFileSync(filePath: string, content: string) {
outputFileSync(joinPathFragments(this.tempDir, filePath), content);
}
async readFile(filePath: string): Promise<string> {
return await readFile(filePath, 'utf-8');
}
removeFileSync(filePath: string): void {
unlinkSync(joinPathFragments(this.tempDir, filePath));
}
appendFile(filePath: string, content: string) {
appendFileSync(joinPathFragments(this.tempDir, filePath), content);
}
@ -46,6 +62,12 @@ export class TempFs {
writeFile(filePath: string, content: string) {
writeFileSync(joinPathFragments(this.tempDir, filePath), content);
}
renameFile(oldPath: string, newPath: string) {
renameSync(
joinPathFragments(this.tempDir, oldPath),
joinPathFragments(this.tempDir, newPath)
);
}
cleanup() {
rmSync(this.tempDir, { recursive: true });