feat(core): create a new function to run child processes via rust (#21070)

Co-authored-by: Jonathan Cammisuli <jon@cammisuli.ca>
Co-authored-by: Emily Xiong <xiongemi@gmail.com>
This commit is contained in:
Jason Jean 2024-01-17 02:56:59 -05:00 committed by GitHub
parent 78a4df8d26
commit d4f3e63a4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 963 additions and 67 deletions

205
Cargo.lock generated
View File

@ -194,9 +194,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.3.3"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42"
checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07"
[[package]]
name = "bitvec"
@ -289,7 +289,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a68fa787550392a9d58f44c21a3022cfb3ea3e2458b7f85d3b399d0ceeccf409"
dependencies = [
"async-trait",
"nix",
"nix 0.27.1",
"tokio",
"winapi",
]
@ -333,7 +333,7 @@ dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"memoffset 0.8.0",
"scopeguard",
]
@ -391,6 +391,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "downcast-rs"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650"
[[package]]
name = "dunce"
version = "1.0.4"
@ -411,23 +417,12 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "errno"
version = "0.3.1"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
dependencies = [
"errno-dragonfly",
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
dependencies = [
"cc",
"libc",
"windows-sys 0.52.0",
]
[[package]]
@ -460,6 +455,17 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "filedescriptor"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7199d965852c3bac31f779ef99cbb4537f80e952e2d6aa0ffeb30cce00f4f46e"
dependencies = [
"libc",
"thiserror",
"winapi",
]
[[package]]
name = "filetime"
version = "0.2.23"
@ -667,7 +673,7 @@ version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e0be46f4cf1f8f9e88d0e3eb7b29718aff23889563249f379119bd1ab6910e"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.1",
"bstr",
"gix-path",
"libc",
@ -715,7 +721,7 @@ version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5db19298c5eeea2961e5b3bf190767a2d1f09b8802aeb5f258e42276350aff19"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.1",
"bstr",
"gix-features",
"gix-path",
@ -801,7 +807,7 @@ version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78f6dce0c6683e2219e8169aac4b1c29e89540a8262fef7056b31d80d969408c"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.1",
"gix-path",
"libc",
"windows",
@ -1026,6 +1032,15 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "ioctl-rs"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7970510895cee30b3e9128319f2cefd4bde883a39f38baa279567ba3a7eb97d"
dependencies = [
"libc",
]
[[package]]
name = "is-macro"
version = "0.3.0"
@ -1218,6 +1233,15 @@ dependencies = [
"libc",
]
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]]
name = "memoffset"
version = "0.8.0"
@ -1284,7 +1308,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49ac8112fe5998579b22e29903c7b277fc7f91c7860c0236f35792caf8156e18"
dependencies = [
"anyhow",
"bitflags 2.3.3",
"bitflags 2.4.1",
"ctor",
"napi-derive",
"napi-sys",
@ -1351,13 +1375,27 @@ dependencies = [
"smallvec",
]
[[package]]
name = "nix"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
dependencies = [
"autocfg",
"bitflags 1.3.2",
"cfg-if",
"libc",
"memoffset 0.6.5",
"pin-utils",
]
[[package]]
name = "nix"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.1",
"cfg-if",
"libc",
]
@ -1384,7 +1422,7 @@ version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.1",
"crossbeam-channel",
"filetime",
"fsevent-sys",
@ -1479,6 +1517,7 @@ dependencies = [
"nom",
"once_cell",
"parking_lot",
"portable-pty",
"rayon",
"regex",
"rkyv",
@ -1487,6 +1526,7 @@ dependencies = [
"swc_ecma_dep_graph",
"swc_ecma_parser",
"swc_ecma_visit",
"term_size",
"thiserror",
"tokio",
"tracing",
@ -1591,6 +1631,27 @@ dependencies = [
"syn 2.0.46",
]
[[package]]
name = "portable-pty"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806ee80c2a03dbe1a9fb9534f8d19e4c0546b790cde8fd1fea9d6390644cb0be"
dependencies = [
"anyhow",
"bitflags 1.3.2",
"downcast-rs",
"filedescriptor",
"lazy_static",
"libc",
"log",
"nix 0.25.1",
"serial",
"shared_library",
"shell-words",
"winapi",
"winreg",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -1946,6 +2007,48 @@ dependencies = [
"syn 2.0.46",
]
[[package]]
name = "serial"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1237a96570fc377c13baa1b88c7589ab66edced652e43ffb17088f003db3e86"
dependencies = [
"serial-core",
"serial-unix",
"serial-windows",
]
[[package]]
name = "serial-core"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f46209b345401737ae2125fe5b19a77acce90cd53e1658cda928e4fe9a64581"
dependencies = [
"libc",
]
[[package]]
name = "serial-unix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f03fbca4c9d866e24a459cbca71283f545a37f8e3e002ad8c70593871453cab7"
dependencies = [
"ioctl-rs",
"libc",
"serial-core",
"termios",
]
[[package]]
name = "serial-windows"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c6d3b776267a75d31bbdfd5d36c0ca051251caafc285827052bc53bcdc8162"
dependencies = [
"libc",
"serial-core",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
@ -1961,6 +2064,22 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shared_library"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a9e7e0f2bfae24d8a5b5a66c5b257a83c7412304311512a0c054cd5e619da11"
dependencies = [
"lazy_static",
"libc",
]
[[package]]
name = "shell-words"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@ -2119,7 +2238,7 @@ version = "0.107.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5693558188efdd5b664e517b69ba8056a7f64c214ca8cd034e3ae8314566b866"
dependencies = [
"bitflags 2.3.3",
"bitflags 2.4.1",
"is-macro",
"num-bigint",
"scoped-tls",
@ -2264,6 +2383,25 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "term_size"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e4129646ca0ed8f45d09b929036bafad5377103edd06e50bf574b353d2b08d9"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "termios"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a"
dependencies = [
"libc",
]
[[package]]
name = "termtree"
version = "0.4.0"
@ -2565,7 +2703,7 @@ dependencies = [
"futures",
"ignore-files 1.3.2",
"miette",
"nix",
"nix 0.27.1",
"normalize-path",
"notify",
"once_cell",
@ -2584,7 +2722,7 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fa905a7f327bfdda78b9c06831d3180a419b7b722bd1ef779ac13ff2ab69df0"
dependencies = [
"nix",
"nix 0.27.1",
"notify",
"watchexec-signals",
]
@ -2611,7 +2749,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af0a778522cf0fc2fa8a8f1380e32893208cb2e7fd33e64de8bd81a00a2a7838"
dependencies = [
"miette",
"nix",
"nix 0.27.1",
"thiserror",
]
@ -2623,7 +2761,7 @@ checksum = "6214815382a9cadf1f0e521e3c28ae4e02541b96622d0e78053f03b730a1437f"
dependencies = [
"command-group",
"futures",
"nix",
"nix 0.27.1",
"tokio",
"tracing",
"watchexec-events",
@ -2887,6 +3025,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "wyz"
version = "0.5.1"

View File

@ -583,7 +583,7 @@ describe('Linter', () => {
const outFlat = runCLI(`affected -t lint`, {
silenceError: true,
});
expect(outFlat).toContain('All files pass linting');
expect(outFlat).toContain('ran target lint');
}, 1000000);
it('should convert standalone to flat config', () => {
@ -616,7 +616,7 @@ describe('Linter', () => {
const outFlat = runCLI(`affected -t lint`, {
silenceError: true,
});
expect(outFlat).toContain('All files pass linting');
expect(outFlat).toContain('ran target lint');
}, 1000000);
});

View File

@ -17,6 +17,7 @@ ignore-files = "2.0.0"
itertools = "0.10.5"
once_cell = "1.18.0"
parking_lot = { version = "0.12.1", features = ["send_guard"] }
portable-pty = "0.8.1"
napi = { version = '2.12.6', default-features = false, features = [
'anyhow',
'napi4',
@ -41,6 +42,7 @@ swc_common = "0.31.16"
swc_ecma_parser = { version = "0.137.1", features = ["typescript"] }
swc_ecma_visit = "0.93.0"
swc_ecma_ast = "0.107.0"
term_size = "0.3.2"
[lib]
crate-type = ['cdylib']

View File

@ -20,7 +20,7 @@ import { NxJsonConfiguration } from '../../config/nx-json';
import { readNxJson } from '../../config/configuration';
import { PromisedBasedQueue } from '../../utils/promised-based-queue';
import { hasNxJson } from '../../config/nx-json';
import { Message, SocketMessenger } from './socket-messenger';
import { Message, DaemonSocketMessenger } from './daemon-socket-messenger';
import { safelyCleanUpExistingProcess } from '../cache';
import { Hash } from '../../hasher/task-hasher';
import { Task, TaskGraph } from '../../config/task-graph';
@ -50,7 +50,7 @@ export class DaemonClient {
}
private queue: PromisedBasedQueue;
private socketMessenger: SocketMessenger;
private socketMessenger: DaemonSocketMessenger;
private currentMessage;
private currentResolve;
@ -172,10 +172,12 @@ export class DaemonClient {
) => void
): Promise<UnregisterCallback> {
await this.getProjectGraphAndSourceMaps();
let messenger: SocketMessenger | undefined;
let messenger: DaemonSocketMessenger | undefined;
await this.queue.sendToQueue(() => {
messenger = new SocketMessenger(connect(FULL_OS_SOCKET_PATH)).listen(
messenger = new DaemonSocketMessenger(
connect(FULL_OS_SOCKET_PATH)
).listen(
(message) => {
try {
const parsedMessage = JSON.parse(message);
@ -248,7 +250,7 @@ export class DaemonClient {
}
private setUpConnection() {
this.socketMessenger = new SocketMessenger(
this.socketMessenger = new DaemonSocketMessenger(
connect(FULL_OS_SOCKET_PATH)
).listen(
(message) => this.handleMessage(message),

View File

@ -8,7 +8,7 @@ export interface Message extends Record<string, any> {
data?: any;
}
export class SocketMessenger {
export class DaemonSocketMessenger {
constructor(private socket: Socket) {}
async sendMessage(messageToDaemon: Message) {

View File

@ -1,7 +1,7 @@
import { unlinkSync } from 'fs';
import { platform } from 'os';
import { resolve } from 'path';
import { DAEMON_SOCKET_PATH } from './tmp-dir';
import { join, resolve } from 'path';
import { DAEMON_SOCKET_PATH, socketDir } from './tmp-dir';
export const isWindows = platform() === 'win32';
@ -15,6 +15,11 @@ export const FULL_OS_SOCKET_PATH = isWindows
? '\\\\.\\pipe\\nx\\' + resolve(DAEMON_SOCKET_PATH)
: resolve(DAEMON_SOCKET_PATH);
export const FORKED_PROCESS_OS_SOCKET_PATH = (id: string) => {
let path = resolve(join(socketDir, 'fp' + id + '.sock'));
return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path);
};
export function killSocketOrPath(): void {
try {
unlinkSync(FULL_OS_SOCKET_PATH);

View File

@ -21,7 +21,7 @@ export const DAEMON_OUTPUT_LOG_FILE = join(
'daemon.log'
);
const socketDir = process.env.NX_DAEMON_SOCKET_DIR || createSocketDir();
export const socketDir = process.env.NX_DAEMON_SOCKET_DIR || createSocketDir();
export const DAEMON_SOCKET_PATH = join(
socketDir,

View File

@ -4,6 +4,7 @@ import * as yargsParser from 'yargs-parser';
import { env as appendLocalEnv } from 'npm-run-path';
import { ExecutorContext } from '../../config/misc-interfaces';
import * as chalk from 'chalk';
import { runCommand } from '../../native';
export const LARGE_BUFFER = 1024 * 1000000;
@ -121,7 +122,8 @@ async function runInParallel(
options.readyWhen,
options.color,
calculateCwd(options.cwd, context),
options.env ?? {}
options.env ?? {},
true
).then((result) => ({
result,
command: c.command,
@ -187,7 +189,8 @@ async function runSerially(
undefined,
options.color,
calculateCwd(options.cwd, context),
options.env ?? {}
options.env ?? {},
false
);
if (!success) {
process.stderr.write(
@ -200,7 +203,7 @@ async function runSerially(
return true;
}
function createProcess(
async function createProcess(
commandConfig: {
command: string;
color?: string;
@ -210,12 +213,50 @@ function createProcess(
readyWhen: string,
color: boolean,
cwd: string,
env: Record<string, string>
env: Record<string, string>,
isParallel: boolean
): Promise<boolean> {
env = processEnv(color, cwd, env);
// The rust runCommand is always a tty, so it will not look nice in parallel and if we need prefixes
// currently does not work properly in windows
if (
process.env.NX_NATIVE_COMMAND_RUNNER !== 'false' &&
process.stdout.isTTY &&
!commandConfig.prefix &&
!isParallel
) {
const cp = runCommand(commandConfig.command, cwd, env);
return new Promise((res) => {
cp.onOutput((output) => {
if (readyWhen && output.indexOf(readyWhen) > -1) {
res(true);
}
});
cp.onExit((code) => res(code === 0));
});
}
return nodeProcess(commandConfig, color, cwd, env, readyWhen);
}
function nodeProcess(
commandConfig: {
command: string;
color?: string;
bgColor?: string;
prefix?: string;
},
color: boolean,
cwd: string,
env: Record<string, string>,
readyWhen: string
): Promise<boolean> {
return new Promise((res) => {
const childProcess = exec(commandConfig.command, {
maxBuffer: LARGE_BUFFER,
env: processEnv(color, cwd, env),
env,
cwd,
});
/**

View File

@ -0,0 +1,212 @@
use std::{
collections::HashMap,
io::{BufReader, Read, Write},
};
use anyhow::anyhow;
use crossbeam_channel::{bounded, unbounded, Receiver};
use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi::threadsafe_function::ThreadsafeFunction;
use napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking;
use napi::{Env, JsFunction};
use portable_pty::{ChildKiller, CommandBuilder, NativePtySystem, PtySize, PtySystem};
fn command_builder() -> CommandBuilder {
if cfg!(target_os = "windows") {
let comspec = std::env::var("COMSPEC");
let shell = comspec
.as_ref()
.map(|v| v.as_str())
.unwrap_or_else(|_| "cmd.exe");
let mut command = CommandBuilder::new(shell);
command.arg("/C");
command
} else {
let mut command = CommandBuilder::new("sh");
command.arg("-c");
command
}
}
pub enum ChildProcessMessage {
Kill,
}
#[napi]
pub struct ChildProcess {
process_killer: Box<dyn ChildKiller + Sync + Send>,
message_receiver: Receiver<String>,
wait_receiver: Receiver<u32>,
}
#[napi]
impl ChildProcess {
pub fn new(
process_killer: Box<dyn ChildKiller + Sync + Send>,
message_receiver: Receiver<String>,
exit_receiver: Receiver<u32>,
) -> Self {
Self {
process_killer,
message_receiver,
wait_receiver: exit_receiver,
}
}
#[napi]
pub fn kill(&mut self) -> anyhow::Result<()> {
self.process_killer.kill().map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub fn on_exit(
&mut self,
#[napi(ts_arg_type = "(code: number) => void")] callback: JsFunction,
) -> napi::Result<()> {
let wait = self.wait_receiver.clone();
let callback_tsfn: ThreadsafeFunction<u32, Fatal> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
std::thread::spawn(move || {
// we will only get one exit_code here, so we dont need to do a while loop
if let Ok(exit_code) = wait.recv() {
callback_tsfn.call(exit_code, NonBlocking);
}
});
Ok(())
}
#[napi]
pub fn on_output(
&mut self,
env: Env,
#[napi(ts_arg_type = "(message: string) => void")] callback: JsFunction,
) -> napi::Result<()> {
let rx = self.message_receiver.clone();
let mut callback_tsfn: ThreadsafeFunction<String, Fatal> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
callback_tsfn.unref(&env)?;
std::thread::spawn(move || {
while let Ok(content) = rx.recv() {
callback_tsfn.call(content, NonBlocking);
}
});
Ok(())
}
}
fn get_directory(command_dir: Option<String>) -> anyhow::Result<String> {
if let Some(command_dir) = command_dir {
Ok(command_dir)
} else {
std::env::current_dir()
.map(|v| v.to_string_lossy().to_string())
.map_err(|_| {
anyhow!("failed to get current directory, please specify command_dir explicitly")
})
}
}
#[napi]
pub fn run_command(
command: String,
command_dir: Option<String>,
js_env: Option<HashMap<String, String>>,
quiet: Option<bool>,
) -> napi::Result<ChildProcess> {
let command_dir = get_directory(command_dir)?;
let quiet = quiet.unwrap_or(false);
let pty_system = NativePtySystem::default();
let (w, h) = term_size::dimensions().unwrap_or((80, 24));
let pair = pty_system.openpty(PtySize {
rows: h as u16,
cols: w as u16,
pixel_width: 0,
pixel_height: 0,
})?;
let mut cmd = command_builder();
cmd.arg(command.as_str());
cmd.cwd(command_dir);
if let Some(js_env) = js_env {
for (key, value) in js_env {
cmd.env(key, value);
}
}
let (message_tx, message_rx) = unbounded();
let reader = pair.master.try_clone_reader()?;
let mut stdout = std::io::stdout();
std::thread::spawn(move || {
let mut reader = BufReader::new(reader);
let mut buffer = [0; 8 * 1024];
let mut strip_clear_code = cfg!(target_os = "windows");
while let Ok(n) = reader.read(&mut buffer) {
if n == 0 {
break;
}
let mut content = String::from_utf8_lossy(&buffer[..n]).to_string();
if strip_clear_code {
strip_clear_code = false;
// remove clear screen
content = content.replacen("\x1B[2J", "", 1);
// remove cursor position 1,1
content = content.replacen("\x1B[H", "", 1);
}
message_tx.send(content.to_string()).ok();
if !quiet {
stdout.write_all(content.as_bytes()).ok();
stdout.flush().ok();
}
}
});
let mut child = pair.slave.spawn_command(cmd)?;
// Release any handles owned by the slave
// we don't need it now that we've spawned the child.
drop(pair.slave);
let process_killer = child.clone_killer();
let (exit_tx, exit_rx) = bounded(1);
std::thread::spawn(move || {
let exit = child.wait().unwrap();
// make sure that master is only dropped after we wait on the child. Otherwise windows does not like it
drop(pair.master);
exit_tx.send(exit.exit_code()).ok();
});
Ok(ChildProcess::new(process_killer, message_rx, exit_rx))
}
/// This allows us to run a pseudoterminal with a fake node ipc channel
/// this makes it possible to be backwards compatible with the old implementation
#[napi]
pub fn nx_fork(
id: String,
fork_script: String,
psuedo_ipc_path: String,
command_dir: Option<String>,
js_env: Option<HashMap<String, String>>,
quiet: bool,
) -> napi::Result<ChildProcess> {
run_command(
format!("node {} {} {}", fork_script, psuedo_ipc_path, id),
command_dir,
js_env,
Some(quiet),
)
}

View File

@ -21,6 +21,12 @@ export function expandOutputs(directory: string, entries: Array<string>): Array<
export function getFilesForOutputs(directory: string, entries: Array<string>): Array<string>
export function remove(src: string): void
export function copy(src: string, dest: string): void
export function runCommand(command: string, commandDir?: string | undefined | null, jsEnv?: Record<string, string> | undefined | null, quiet?: boolean | undefined | null): ChildProcess
/**
* This allows us to run a pseudoterminal with a fake node ipc channel
* this makes it possible to be backwards compatible with the old implementation
*/
export function nxFork(id: string, forkScript: string, psuedoIpcPath: string, commandDir: string | undefined | null, jsEnv: Record<string, string> | undefined | null, quiet: boolean): ChildProcess
export function hashArray(input: Array<string>): string
export function hashFile(file: string): string | null
export function findImports(projectFileMap: Record<string, Array<string>>): Array<ImportResult>
@ -140,6 +146,11 @@ export interface FileMap {
nonProjectFiles: Array<FileData>
}
export function testOnlyTransferFileMap(projectFiles: Record<string, Array<FileData>>, nonProjectFiles: Array<FileData>): NxWorkspaceFilesExternals
export class ChildProcess {
kill(): void
onExit(callback: (code: number) => void): void
onOutput(callback: (message: string) => void): void
}
export class ImportResult {
file: string
sourceProject: string

View File

@ -246,12 +246,15 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
const { expandOutputs, getFilesForOutputs, remove, copy, hashArray, hashFile, ImportResult, findImports, transferProjectGraph, HashPlanner, TaskHasher, EventType, Watcher, WorkspaceContext, WorkspaceErrors, testOnlyTransferFileMap } = nativeBinding
const { expandOutputs, getFilesForOutputs, remove, copy, ChildProcess, runCommand, nxFork, hashArray, hashFile, ImportResult, findImports, transferProjectGraph, HashPlanner, TaskHasher, EventType, Watcher, WorkspaceContext, WorkspaceErrors, testOnlyTransferFileMap } = nativeBinding
module.exports.expandOutputs = expandOutputs
module.exports.getFilesForOutputs = getFilesForOutputs
module.exports.remove = remove
module.exports.copy = copy
module.exports.ChildProcess = ChildProcess
module.exports.runCommand = runCommand
module.exports.nxFork = nxFork
module.exports.hashArray = hashArray
module.exports.hashFile = hashFile
module.exports.ImportResult = ImportResult

View File

@ -1,4 +1,5 @@
pub mod cache;
pub mod command;
pub mod glob;
pub mod hasher;
mod logger;

View File

@ -0,0 +1,49 @@
import { PseudoTtyProcess } from '../../utils/child-process';
import { runCommand } from '../index';
describe('runCommand', () => {
it('should run command', async () => {
const childProcess = runCommand('echo "hello world"', process.cwd());
expect(() => {
childProcess.onExit((exitCode) => expect(exitCode).toEqual(0));
});
});
it('should kill a running command', () => {
const childProcess = new PseudoTtyProcess(
runCommand(
'sleep 3 && echo "hello world" > file.txt',
process.cwd()
)
);
childProcess.onExit((exit_code) => {
expect(exit_code).not.toEqual(0);
});
childProcess.kill();
expect(childProcess.isAlive).toEqual(false);
}, 1000);
it('should subscribe to output', (done) => {
const childProcess = runCommand('echo "hello world"', process.cwd());
childProcess.onOutput((output) => {
expect(output.trim()).toEqual('hello world');
});
childProcess.onExit(() => {
done();
});
});
it('should be tty', (done) => {
const childProcess = runCommand('node -p "process.stdout.isTTY"');
childProcess.onOutput((out) => {
let output = JSON.stringify(out.trim());
// check to make sure that we have ansi sequence characters only available in tty terminals
expect(output).toMatchInlineSnapshot(`""\\u001b[33mtrue\\u001b[39m""`);
});
childProcess.onExit((_) => {
done();
});
});
});

View File

@ -0,0 +1,29 @@
import { fork, Serializable } from 'child_process';
import { join } from 'path';
import { PsuedoIPCClient } from './psuedo-ipc';
const psuedoIPCPath = process.argv[2];
const forkId = process.argv[3];
const script = join(__dirname, '../../bin/run-executor.js');
const childProcess = fork(script, {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
});
const psuedoIPC = new PsuedoIPCClient(psuedoIPCPath);
psuedoIPC.onMessageFromParent(forkId, (message) => {
childProcess.send(message);
});
psuedoIPC.notifyChildIsReady(forkId);
process.on('message', (message: Serializable) => {
psuedoIPC.sendMessageToParent(message);
});
childProcess.on('exit', (code) => {
psuedoIPC.close();
process.exit(code);
});

View File

@ -14,7 +14,13 @@ import {
} from './batch/batch-messages';
import { stripIndents } from '../utils/strip-indents';
import { Task, TaskGraph } from '../config/task-graph';
import { Transform } from 'stream';
import { Readable, Transform } from 'stream';
import { ChildProcess as NativeChildProcess, nxFork } from '../native';
import { PsuedoIPCServer } from './psuedo-ipc';
import { FORKED_PROCESS_OS_SOCKET_PATH } from '../daemon/socket-utils';
import { PseudoTtyProcess } from '../utils/child-process';
const forkScript = join(__dirname, './fork.js');
const workerPath = join(__dirname, './batch/run-batch.js');
@ -22,9 +28,16 @@ export class ForkedProcessTaskRunner {
cliPath = getCliPath();
private readonly verbose = process.env.NX_VERBOSE_LOGGING === 'true';
private processes = new Set<ChildProcess>();
private processes = new Set<ChildProcess | PseudoTtyProcess>();
constructor(private readonly options: DefaultTasksRunnerOptions) {
private psuedoIPCPath = FORKED_PROCESS_OS_SOCKET_PATH(process.pid.toString());
private psuedoIPC = new PsuedoIPCServer(this.psuedoIPCPath);
constructor(private readonly options: DefaultTasksRunnerOptions) {}
async init() {
await this.psuedoIPC.init();
this.setupProcessEventListeners();
}
@ -107,7 +120,155 @@ export class ForkedProcessTaskRunner {
});
}
public forkProcessPipeOutputCapture(
public async forkProcessLegacy(
task: Task,
{
temporaryOutputPath,
streamOutput,
pipeOutput,
taskGraph,
env,
}: {
temporaryOutputPath: string;
streamOutput: boolean;
pipeOutput: boolean;
taskGraph: TaskGraph;
env: NodeJS.ProcessEnv;
}
): Promise<{ code: number; terminalOutput: string }> {
return pipeOutput
? await this.forkProcessPipeOutputCapture(task, {
temporaryOutputPath,
streamOutput,
taskGraph,
env,
})
: await this.forkProcessDirectOutputCapture(task, {
temporaryOutputPath,
streamOutput,
taskGraph,
env,
});
}
public async forkProcess(
task: Task,
{
temporaryOutputPath,
streamOutput,
pipeOutput,
taskGraph,
env,
}: {
temporaryOutputPath: string;
streamOutput: boolean;
pipeOutput: boolean;
taskGraph: TaskGraph;
env: NodeJS.ProcessEnv;
}
): Promise<{ code: number; terminalOutput: string }> {
const shouldPrefix =
streamOutput && process.env.NX_PREFIX_OUTPUT === 'true';
// streamOutput would be false if we are running multiple targets
// there's no point in running the commands in a pty if we are not streaming the output
if (!streamOutput || shouldPrefix || !process.stdout.isTTY) {
return this.forkProcessWithPrefixAndNotTTY(task, {
temporaryOutputPath,
streamOutput,
taskGraph,
env,
});
} else {
return this.forkProcessWithPsuedoTerminal(task, {
temporaryOutputPath,
streamOutput,
taskGraph,
env,
});
}
}
private async forkProcessWithPsuedoTerminal(
task: Task,
{
temporaryOutputPath,
streamOutput,
taskGraph,
env,
}: {
temporaryOutputPath: string;
streamOutput: boolean;
taskGraph: TaskGraph;
env: NodeJS.ProcessEnv;
}
): Promise<{ code: number; terminalOutput: string }> {
const args = getPrintableCommandArgsForTask(task);
if (streamOutput) {
output.logCommand(args.join(' '));
output.addNewline();
}
const childId = task.id;
const p = new PseudoTtyProcess(
nxFork(
childId,
forkScript,
this.psuedoIPCPath,
process.cwd(),
env,
!streamOutput
)
);
await this.psuedoIPC.waitForChildReady(childId);
this.psuedoIPC.sendMessageToChild(childId, {
targetDescription: task.target,
overrides: task.overrides,
taskGraph,
isVerbose: this.verbose,
});
this.processes.add(p);
let terminalOutput = '';
p.onOutput((msg) => {
terminalOutput += msg;
});
return new Promise((res) => {
p.onExit((code) => {
res({
code,
terminalOutput,
});
});
});
}
private forkProcessPipeOutputCapture(
task: Task,
{
streamOutput,
temporaryOutputPath,
taskGraph,
env,
}: {
streamOutput: boolean;
temporaryOutputPath: string;
taskGraph: TaskGraph;
env: NodeJS.ProcessEnv;
}
) {
return this.forkProcessWithPrefixAndNotTTY(task, {
streamOutput,
temporaryOutputPath,
taskGraph,
env,
});
}
private forkProcessWithPrefixAndNotTTY(
task: Task,
{
streamOutput,
@ -203,7 +364,7 @@ export class ForkedProcessTaskRunner {
});
}
public forkProcessDirectOutputCapture(
private forkProcessDirectOutputCapture(
task: Task,
{
streamOutput,
@ -296,10 +457,17 @@ export class ForkedProcessTaskRunner {
}
private setupProcessEventListeners() {
this.psuedoIPC.onMessageFromChildren((message: Serializable) => {
process.send(message);
});
// When the nx process gets a message, it will be sent into the task's process
process.on('message', (message: Serializable) => {
// this.publisher.publish(message.toString());
this.psuedoIPC.sendMessageToChildren(message);
this.processes.forEach((p) => {
if (p.connected) {
if ('connected' in p && p.connected) {
p.send(message);
}
});
@ -308,14 +476,14 @@ export class ForkedProcessTaskRunner {
// Terminate any task processes on exit
process.on('exit', () => {
this.processes.forEach((p) => {
if (p.connected) {
if ('connected' in p ? p.connected : p.isAlive) {
p.kill();
}
});
});
process.on('SIGINT', () => {
this.processes.forEach((p) => {
if (p.connected) {
if ('connected' in p ? p.connected : p.isAlive) {
p.kill('SIGTERM');
}
});
@ -324,7 +492,7 @@ export class ForkedProcessTaskRunner {
});
process.on('SIGTERM', () => {
this.processes.forEach((p) => {
if (p.connected) {
if ('connected' in p ? p.connected : p.isAlive) {
p.kill('SIGTERM');
}
});
@ -333,7 +501,7 @@ export class ForkedProcessTaskRunner {
});
process.on('SIGHUP', () => {
this.processes.forEach((p) => {
if (p.connected) {
if ('connected' in p ? p.connected : p.isAlive) {
p.kill('SIGTERM');
}
});
@ -341,6 +509,10 @@ export class ForkedProcessTaskRunner {
// will store results to the cache and will terminate this process
});
}
destroy() {
this.psuedoIPC.close();
}
}
const colors = [

View File

@ -0,0 +1,182 @@
/**
* Node IPC is specific to Node, but when spawning child processes in Rust, it won't have IPC.
*
* Thus, this is a wrapper which is spawned by Rust, which will create a Node IPC channel and pipe it to a ZeroMQ Channel
*
* Main Nx Process
* * Calls Rust Fork Function
* * `node fork.js`
* * Create a Rust - Node.js Agnostic Channel aka Psuedo IPC Channel
* * This returns RustChildProcess
* * RustChildProcess.onMessage(msg => ());
* * psuedo_ipc_channel.on_message() => tx.send(msg);
* * Node.js Fork Wrapper (fork.js)
* * fork(run-command.js) with `inherit` and `ipc`
* * This will create a Node IPC Channel
* * channel = getPsuedoIpcChannel(process.env.NX_IPC_CHANNEL_ID)
* * forkChildProcess.on('message', writeToPsuedoIpcChannel)
*/
import { connect, Server, Socket } from 'net';
import { consumeMessagesFromSocket } from '../utils/consume-messages-from-socket';
import { Serializable } from 'child_process';
export interface PsuedoIPCMessage {
type: 'TO_CHILDREN_FROM_PARENT' | 'TO_PARENT_FROM_CHILDREN' | 'CHILD_READY';
id: string | undefined;
message: Serializable;
}
export class PsuedoIPCServer {
private sockets = new Set<Socket>();
private server: Server | undefined;
private childMessages: {
onMessage: (message: Serializable) => void;
onClose?: () => void;
onError?: (err: Error) => void;
}[] = [];
constructor(private path: string) {}
init(): Promise<void> {
return new Promise((res) => {
this.server = new Server((socket) => {
this.sockets.add(socket);
this.registerChildMessages(socket);
socket.on('close', () => {
this.sockets.delete(socket);
});
});
this.server.listen(this.path, () => {
res();
});
});
}
private childReadyMap = new Map<string, () => void>();
async waitForChildReady(childId: string) {
return new Promise<void>((res) => {
this.childReadyMap.set(childId, res);
});
}
private registerChildMessages(socket: Socket) {
socket.on(
'data',
consumeMessagesFromSocket(async (rawMessage) => {
const { type, message }: PsuedoIPCMessage = JSON.parse(rawMessage);
if (type === 'TO_PARENT_FROM_CHILDREN') {
for (const childMessage of this.childMessages) {
childMessage.onMessage(message);
}
} else if (type === 'CHILD_READY') {
const childId = message as string;
if (this.childReadyMap.has(childId)) {
this.childReadyMap.get(childId)();
}
}
})
);
socket.on('close', () => {
for (const childMessage of this.childMessages) {
childMessage.onClose?.();
}
});
socket.on('error', (err) => {
for (const childMessage of this.childMessages) {
childMessage.onError?.(err);
}
});
}
sendMessageToChildren(message: Serializable) {
this.sockets.forEach((socket) => {
socket.write(
JSON.stringify({ type: 'TO_CHILDREN_FROM_PARENT', message })
);
// send EOT to indicate that the message has been fully written
socket.write(String.fromCodePoint(4));
});
}
sendMessageToChild(id: string, message: Serializable) {
this.sockets.forEach((socket) => {
socket.write(
JSON.stringify({ type: 'TO_CHILDREN_FROM_PARENT', id, message })
);
socket.write(String.fromCodePoint(4));
});
}
onMessageFromChildren(
onMessage: (message: Serializable) => void,
onClose: () => void = () => {},
onError: (err: Error) => void = (err) => {}
) {
this.childMessages.push({
onMessage,
onClose,
onError,
});
}
close() {
this.server?.close();
this.sockets.forEach((s) => s.destroy());
}
}
export class PsuedoIPCClient {
private socket: Socket | undefined = connect(this.path);
constructor(private path: string) {}
sendMessageToParent(message: Serializable) {
this.socket.write(
JSON.stringify({ type: 'TO_PARENT_FROM_CHILDREN', message })
);
// send EOT to indicate that the message has been fully written
this.socket.write(String.fromCodePoint(4));
}
notifyChildIsReady(id: string) {
this.socket.write(
JSON.stringify({
type: 'CHILD_READY',
message: id,
} as PsuedoIPCMessage)
);
// send EOT to indicate that the message has been fully written
this.socket.write(String.fromCodePoint(4));
}
onMessageFromParent(
forkId: string,
onMessage: (message: Serializable) => void,
onClose: () => void = () => {},
onError: (err: Error) => void = (err) => {}
) {
this.socket.on(
'data',
consumeMessagesFromSocket(async (rawMessage) => {
const { id, type, message }: PsuedoIPCMessage = JSON.parse(rawMessage);
if (type === 'TO_CHILDREN_FROM_PARENT') {
if (id && id === forkId) {
onMessage(message);
} else if (id === undefined) {
onMessage(message);
}
}
})
);
this.socket.on('close', onClose);
this.socket.on('error', onError);
return this;
}
close() {
this.socket?.destroy();
}
}

View File

@ -66,6 +66,9 @@ export class TaskOrchestrator {
) {}
async run() {
// Init the ForkedProcessTaskRunner
await this.forkedProcessTaskRunner.init();
// initial scheduling
await this.scheduleNextTasks();
@ -88,6 +91,7 @@ export class TaskOrchestrator {
'task-execution:start',
'task-execution:end'
);
this.forkedProcessTaskRunner.destroy();
this.cache.removeOldCacheRecords();
return this.completedTasks;
@ -398,25 +402,22 @@ export class TaskOrchestrator {
) {
try {
// execution
const { code, terminalOutput } = pipeOutput
? await this.forkedProcessTaskRunner.forkProcessPipeOutputCapture(
task,
{
const { code, terminalOutput } =
process.env.NX_NATIVE_COMMAND_RUNNER !== 'false'
? await this.forkedProcessTaskRunner.forkProcess(task, {
temporaryOutputPath,
streamOutput,
pipeOutput,
taskGraph: this.taskGraph,
env,
}
)
: await this.forkedProcessTaskRunner.forkProcessDirectOutputCapture(
task,
{
})
: await this.forkedProcessTaskRunner.forkProcessLegacy(task, {
temporaryOutputPath,
streamOutput,
pipeOutput,
taskGraph: this.taskGraph,
env,
}
);
});
return {
code,
@ -565,6 +566,10 @@ export class TaskOrchestrator {
private async pipeOutputCapture(task: Task) {
try {
if (process.env.NX_NATIVE_COMMAND_RUNNER !== 'false') {
return true;
}
const { schema } = await getExecutorForTask(task, this.projectGraph);
return (

View File

@ -3,6 +3,7 @@ import { existsSync } from 'fs';
import { join, relative } from 'path';
import { getPackageManagerCommand } from './package-manager';
import { workspaceRoot, workspaceRootInner } from './workspace-root';
import { ChildProcess } from '../native';
export function runNxSync(
cmd: string,
@ -26,3 +27,37 @@ export function runNxSync(
}
execSync(`${baseCmd} ${cmd}`, options);
}
export class PseudoTtyProcess {
isAlive = true;
exitCallbacks = [];
constructor(private childProcess: ChildProcess) {
childProcess.onExit((exitCode) => {
this.isAlive = false;
this.exitCallbacks.forEach((cb) => cb(exitCode));
});
}
onExit(callback: (code: number) => void): void {
this.exitCallbacks.push(callback);
}
onOutput(callback: (message: string) => void): void {
this.childProcess.onOutput(callback);
}
kill(): void {
try {
this.childProcess.kill();
} catch {
// when the child process completes before we explicitly call kill, this will throw
// do nothing
} finally {
if (this.isAlive == true) {
this.isAlive = false;
}
}
}
}