feat(core): clean up unneeded continuous tasks after tasks are done (#30746)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
<!-- This is the behavior we have today -->

Continuous tasks go until the process is done.

## Expected Behavior
<!-- This is the behavior we should expect with the changes in this PR
-->

Continuous tasks are cleaned up after they're no longer needed. AKA once
their dependent tasks are done.

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
This commit is contained in:
Jason Jean 2025-04-22 09:52:51 -04:00 committed by GitHub
parent 99d45a3dcd
commit 4e68270efd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 215 additions and 66 deletions

View File

@ -12,6 +12,7 @@
| `context.daemon?` | `DaemonClient` |
| `context.hasher?` | [`TaskHasher`](../../devkit/documents/TaskHasher) |
| `context.initiatingProject?` | `string` |
| `context.initiatingTasks` | [`Task`](../../devkit/documents/Task)[] |
| `context.nxArgs` | `NxArgs` |
| `context.nxJson` | [`NxJsonConfiguration`](../../devkit/documents/NxJsonConfiguration)\<`string`[] \| `"*"`\> |
| `context.projectGraph` | [`ProjectGraph`](../../devkit/documents/ProjectGraph) |

View File

@ -322,7 +322,8 @@ export declare const enum TaskStatus {
RemoteCache = 5,
NotStarted = 6,
InProgress = 7,
Shared = 8
Shared = 8,
Stopped = 9
}
export interface TaskTarget {

View File

@ -112,7 +112,6 @@ impl App {
pub fn print_task_terminal_output(
&mut self,
task_id: String,
status: TaskStatus,
output: String,
) {
if let Some(tasks_list) = self
@ -120,8 +119,9 @@ impl App {
.iter_mut()
.find_map(|c| c.as_any_mut().downcast_mut::<TasksList>())
{
// If the status is a cache hit, we need to create a new parser and writer for the task in order to print the output
if is_cache_hit(status) {
// Tasks run within a pseudo-terminal always have a pty instance and do not need a new one
// Tasks not run within a pseudo-terminal need a new pty instance to print output
if !tasks_list.pty_instances.contains_key(&task_id) {
let (parser, parser_and_writer) = TasksList::create_empty_parser_and_noop_writer();
// Add ANSI escape sequence to hide cursor at the end of output, it would be confusing to have it visible when a task is a cache hit
@ -129,7 +129,6 @@ impl App {
TasksList::write_output_to_parser(parser, output_with_hidden_cursor);
tasks_list.create_and_register_pty_instance(&task_id, parser_and_writer);
tasks_list.update_task_status(task_id.clone(), status);
let _ = tasks_list.handle_resize(None);
return;
}
@ -137,7 +136,6 @@ impl App {
// If the task is continuous, we are only updating the status, not the output
if let Some(task) = tasks_list.tasks.iter_mut().find(|t| t.name == task_id) {
if task.continuous {
tasks_list.update_task_status(task_id.clone(), status);
let _ = tasks_list.handle_resize(None);
}
}

View File

@ -124,6 +124,8 @@ pub enum TaskStatus {
InProgress,
// This task is being run in a different process
Shared,
// This continuous task has been stopped by Nx
Stopped,
}
impl std::str::FromStr for TaskStatus {
@ -951,17 +953,6 @@ impl TasksList {
task_result.task.end_time.unwrap() as u128,
);
}
// If the task never had a pty, it must mean that it was run outside of the pseudo-terminal.
// We create a new parser and writer for the task and register it and then write the final output to the parser
if !self.pty_instances.contains_key(&task.name) {
let (parser, parser_and_writer) = Self::create_empty_parser_and_noop_writer();
if let Some(task_result_output) = task_result.terminal_output {
Self::write_output_to_parser(parser, task_result_output);
}
let task_name = task.name.clone();
self.create_and_register_pty_instance(&task_name, parser_and_writer);
}
}
}
self.sort_tasks();
@ -1402,6 +1393,12 @@ impl Component for TasksList {
Cell::from(Line::from(spans))
}
TaskStatus::Stopped => Cell::from(Line::from(vec![
Span::raw(if is_selected { ">" } else { " " }),
Span::raw(" "),
Span::styled("⯀️", Style::default().fg(Color::DarkGray)),
Span::raw(" "),
])),
TaskStatus::NotStarted => Cell::from(Line::from(vec![
Span::raw(if is_selected { ">" } else { " " }),
// No need for parallel section check for pending tasks

View File

@ -231,6 +231,12 @@ impl<'a> TerminalPane<'a> {
.fg(Color::LightCyan)
.add_modifier(Modifier::BOLD),
),
TaskStatus::Stopped => Span::styled(
" ⯀️ ",
Style::default()
.fg(Color::DarkGray)
.add_modifier(Modifier::BOLD),
),
TaskStatus::NotStarted => Span::styled(
" · ",
Style::default()
@ -249,7 +255,7 @@ impl<'a> TerminalPane<'a> {
TaskStatus::Failure => Color::Red,
TaskStatus::Skipped => Color::Yellow,
TaskStatus::InProgress | TaskStatus::Shared=> Color::LightCyan,
TaskStatus::NotStarted => Color::DarkGray,
TaskStatus::NotStarted | TaskStatus::Stopped=> Color::DarkGray,
})
}
@ -368,6 +374,27 @@ impl<'a> StatefulWidget for TerminalPane<'a> {
return;
}
// If the task has been stopped but does not have a pty
if matches!(state.task_status, TaskStatus::Stopped) && !state.has_pty {
let message = vec![Line::from(vec![Span::styled(
"Running in another Nx process...",
if state.is_focused {
self.get_base_style(TaskStatus::Stopped)
} else {
self.get_base_style(TaskStatus::Stopped)
.add_modifier(Modifier::DIM)
},
)])];
let paragraph = Paragraph::new(message)
.block(block)
.alignment(Alignment::Center)
.style(Style::default());
Widget::render(paragraph, area, buf);
return;
}
let inner_area = block.inner(area);
if let Some(pty_data) = &self.pty_data {

View File

@ -114,11 +114,12 @@ impl AppLifeCycle {
pub fn print_task_terminal_output(
&mut self,
task: Task,
status: String,
_status: String,
output: String,
) -> napi::Result<()> {
debug!("Received task terminal output for {}", task.id);
if let Ok(mut app) = self.app.lock() {
app.print_task_terminal_output(task.id, status.parse().unwrap(), output);
app.print_task_terminal_output(task.id, output);
}
Ok(())
}

View File

@ -61,7 +61,8 @@ pub fn sort_task_items(tasks: &mut [TaskItem]) {
| TaskStatus::LocalCacheKeptExisting
| TaskStatus::LocalCache
| TaskStatus::RemoteCache
| TaskStatus::Skipped => 2,
| TaskStatus::Skipped
| TaskStatus::Stopped => 2,
TaskStatus::NotStarted => 3,
}
};
@ -326,6 +327,7 @@ mod tests {
| TaskStatus::LocalCacheKeptExisting
| TaskStatus::LocalCache
| TaskStatus::RemoteCache
| TaskStatus::Stopped
| TaskStatus::Skipped => 2,
TaskStatus::NotStarted => 3,
}

View File

@ -113,6 +113,7 @@ export const defaultTasksRunner: TasksRunner<
context: {
target: string;
initiatingProject?: string;
initiatingTasks: Task[];
projectGraph: ProjectGraph;
nxJson: NxJsonConfiguration;
nxArgs: NxArgs;
@ -134,6 +135,7 @@ async function runAllTasks(
options: DefaultTasksRunnerOptions,
context: {
initiatingProject?: string;
initiatingTasks: Task[];
projectGraph: ProjectGraph;
nxJson: NxJsonConfiguration;
nxArgs: NxArgs;
@ -145,6 +147,7 @@ async function runAllTasks(
const orchestrator = new TaskOrchestrator(
context.hasher,
context.initiatingProject,
context.initiatingTasks,
context.projectGraph,
context.taskGraph,
context.nxJson,

View File

@ -77,6 +77,7 @@ export async function initTasksRunner(nxArgs: NxArgs) {
nxArgs: { ...nxArgs, parallel: opts.parallel },
loadDotEnvFiles: true,
initiatingProject: null,
initiatingTasks: [],
});
return {
@ -135,6 +136,7 @@ async function createOrchestrator(
const orchestrator = new TaskOrchestrator(
hasher,
null,
[],
projectGraph,
taskGraph,
nxJson,

View File

@ -67,7 +67,7 @@ export interface LifeCycle {
registerRunningTask?(
taskId: string,
parserAndWriter: ExternalObject<[any, any]>
): Promise<void>;
): void;
setTaskStatus?(taskId: string, status: NativeTaskStatus): void;
@ -152,13 +152,13 @@ export class CompositeLifeCycle implements LifeCycle {
}
}
async registerRunningTask(
registerRunningTask(
taskId: string,
parserAndWriter: ExternalObject<[any, any]>
): Promise<void> {
): void {
for (let l of this.lifeCycles) {
if (l.registerRunningTask) {
await l.registerRunningTask(taskId, parserAndWriter);
l.registerRunningTask(taskId, parserAndWriter);
}
}
}

View File

@ -1,4 +1,5 @@
import { EOL } from 'node:os';
import { TaskStatus as NativeTaskStatus } from '../../native';
import { Task } from '../../config/task-graph';
import { output } from '../../utils/output';
import type { LifeCycle } from '../life-cycle';
@ -18,6 +19,7 @@ export function getTuiTerminalSummaryLifeCycle({
args,
overrides,
initiatingProject,
initiatingTasks,
resolveRenderIsDonePromise,
}: {
projectNames: string[];
@ -25,6 +27,7 @@ export function getTuiTerminalSummaryLifeCycle({
args: { targets?: string[]; configuration?: string; parallel?: number };
overrides: Record<string, unknown>;
initiatingProject: string;
initiatingTasks: Task[];
resolveRenderIsDonePromise: (value: void) => void;
}) {
const lifeCycle = {} as Partial<LifeCycle>;
@ -37,6 +40,7 @@ export function getTuiTerminalSummaryLifeCycle({
let totalSuccessfulTasks = 0;
let totalFailedTasks = 0;
let totalCompletedTasks = 0;
let totalStoppedTasks = 0;
let timeTakenText: string;
const failedTasks = new Set<string>();
@ -55,13 +59,20 @@ export function getTuiTerminalSummaryLifeCycle({
lifeCycle.printTaskTerminalOutput = (task, taskStatus, terminalOutput) => {
tasksToTerminalOutputs[task.id] = { terminalOutput, taskStatus };
taskIdsInOrderOfCompletion.push(task.id);
};
lifeCycle.setTaskStatus = (taskId, taskStatus) => {
if (taskStatus === NativeTaskStatus.Stopped) {
totalStoppedTasks++;
taskIdsInOrderOfCompletion.push(taskId);
}
};
lifeCycle.endTasks = (taskResults) => {
for (let t of taskResults) {
totalCompletedTasks++;
inProgressTasks.delete(t.task.id);
taskIdsInOrderOfCompletion.push(t.task.id);
switch (t.status) {
case 'remote-cache':
@ -106,7 +117,7 @@ export function getTuiTerminalSummaryLifeCycle({
const printRunOneSummary = () => {
let lines: string[] = [];
const failure = totalSuccessfulTasks !== totalTasks;
const failure = totalSuccessfulTasks + totalStoppedTasks !== totalTasks;
// Prints task outputs in the order they were completed
// above the summary, since run-one should print all task results.
@ -153,7 +164,7 @@ export function getTuiTerminalSummaryLifeCycle({
);
}
lines = [output.colors.green(lines.join(EOL))];
} else if (totalCompletedTasks === totalTasks) {
} else if (totalCompletedTasks + totalStoppedTasks === totalTasks) {
let text = `Ran target ${output.bold(
targets[0]
)} for project ${output.bold(initiatingProject)}`;
@ -219,7 +230,7 @@ export function getTuiTerminalSummaryLifeCycle({
console.log('');
const lines: string[] = [];
const failure = totalSuccessfulTasks !== totalTasks;
const failure = totalSuccessfulTasks + totalStoppedTasks !== totalTasks;
for (const taskId of taskIdsInOrderOfCompletion) {
const { terminalOutput, taskStatus } = tasksToTerminalOutputs[taskId];
@ -241,7 +252,7 @@ export function getTuiTerminalSummaryLifeCycle({
lines.push(...output.getVerticalSeparatorLines(failure ? 'red' : 'green'));
if (totalSuccessfulTasks === totalTasks) {
if (totalSuccessfulTasks + totalStoppedTasks === totalTasks) {
const successSummaryRows = [];
const text = `Successfully ran ${formatTargetsAndProjects(
projectNames,

View File

@ -75,13 +75,18 @@ const originalConsoleError = console.error.bind(console);
async function getTerminalOutputLifeCycle(
initiatingProject: string,
initiatingTasks: Task[],
projectNames: string[],
tasks: Task[],
taskGraph: TaskGraph,
nxArgs: NxArgs,
nxJson: NxJsonConfiguration,
overrides: Record<string, unknown>
): Promise<{ lifeCycle: LifeCycle; renderIsDone: Promise<void> }> {
): Promise<{
lifeCycle: LifeCycle;
printSummary?: () => void;
renderIsDone: Promise<void>;
}> {
const overridesWithoutHidden = { ...overrides };
delete overridesWithoutHidden['__overrides_unparsed__'];
@ -129,11 +134,7 @@ async function getTerminalOutputLifeCycle(
let titleText = '';
if (isRunOne) {
const mainTaskId = createTaskId(
initiatingProject,
nxArgs.targets[0],
nxArgs.configuration
);
const mainTaskId = initiatingTasks[0].id;
pinnedTasks.push(mainTaskId);
const mainContinuousDependencies =
taskGraph.continuousDependencies[mainTaskId];
@ -169,6 +170,7 @@ async function getTerminalOutputLifeCycle(
args: nxArgs,
overrides: overridesWithoutHidden,
initiatingProject,
initiatingTasks,
resolveRenderIsDonePromise,
});
@ -179,7 +181,6 @@ async function getTerminalOutputLifeCycle(
process.stderr.write = originalStderrWrite;
console.log = originalConsoleLog;
console.error = originalConsoleError;
printSummary();
});
}
@ -264,7 +265,6 @@ async function getTerminalOutputLifeCycle(
process.stderr.write = originalStderrWrite;
console.log = originalConsoleLog;
console.error = originalConsoleError;
printSummary();
// Print the intercepted Nx Cloud logs
for (const log of interceptedNxCloudLogs) {
const logString = log.toString().trimStart();
@ -278,6 +278,7 @@ async function getTerminalOutputLifeCycle(
return {
lifeCycle: new CompositeLifeCycle(lifeCycles),
printSummary,
renderIsDone,
};
}
@ -445,6 +446,7 @@ export async function runCommandForTasks(
extraOptions: { excludeTaskDependencies: boolean; loadDotEnvFiles: boolean }
): Promise<TaskResults> {
const projectNames = projectsToRun.map((t) => t.name);
const projectNameSet = new Set(projectNames);
const { projectGraph, taskGraph } = await ensureWorkspaceIsInSyncAndGetGraphs(
currentProjectGraph,
@ -457,8 +459,16 @@ export async function runCommandForTasks(
);
const tasks = Object.values(taskGraph.tasks);
const { lifeCycle, renderIsDone } = await getTerminalOutputLifeCycle(
const initiatingTasks = tasks.filter(
(t) =>
projectNameSet.has(t.target.project) &&
nxArgs.targets.includes(t.target.target)
);
const { lifeCycle, renderIsDone, printSummary } =
await getTerminalOutputLifeCycle(
initiatingProject,
initiatingTasks,
projectNames,
tasks,
taskGraph,
@ -476,10 +486,15 @@ export async function runCommandForTasks(
nxArgs,
loadDotEnvFiles: extraOptions.loadDotEnvFiles,
initiatingProject,
initiatingTasks,
});
await renderIsDone;
if (printSummary) {
printSummary();
}
await printNxKey();
return taskResults;
@ -814,6 +829,7 @@ export async function invokeTasksRunner({
nxArgs,
loadDotEnvFiles,
initiatingProject,
initiatingTasks,
}: {
tasks: Task[];
projectGraph: ProjectGraph;
@ -823,6 +839,7 @@ export async function invokeTasksRunner({
nxArgs: NxArgs;
loadDotEnvFiles: boolean;
initiatingProject: string | null;
initiatingTasks: Task[];
}): Promise<{ [id: string]: TaskResult }> {
setEnvVarsBasedOnArgs(nxArgs, loadDotEnvFiles);
@ -861,6 +878,7 @@ export async function invokeTasksRunner({
{
initiatingProject:
nxArgs.outputStyle === 'compact' ? null : initiatingProject,
initiatingTasks,
projectGraph,
nxJson,
nxArgs,

View File

@ -1,7 +1,11 @@
import type { Serializable } from 'child_process';
export abstract class RunningTask {
abstract getResults(): Promise<{ code: number; terminalOutput: string }>;
abstract onExit(cb: (code: number) => void): void;
abstract kill(signal?: NodeJS.Signals | number): Promise<void> | void;
abstract send?(message: Serializable): void;
}

View File

@ -0,0 +1,36 @@
import { RunningTask } from './running-task';
import { RunningTasksService } from '../../native';
export class SharedRunningTask implements RunningTask {
private exitCallbacks: ((code: number) => void)[] = [];
constructor(
private runningTasksService: RunningTasksService,
taskId: string
) {
this.waitForTaskToFinish(taskId).then(() => {
// notify exit callbacks
this.exitCallbacks.forEach((cb) => cb(0));
});
}
async getResults(): Promise<{ code: number; terminalOutput: string }> {
throw new Error('Results cannot be retrieved from a shared task');
}
kill(): void {
this.exitCallbacks.forEach((cb) => cb(0));
}
onExit(cb: (code: number) => void): void {
this.exitCallbacks.push(cb);
}
private async waitForTaskToFinish(taskId: string) {
console.log(`Waiting for ${taskId} in another nx process`);
// wait for the running task to finish
do {
await new Promise((resolve) => setTimeout(resolve, 100));
} while (this.runningTasksService.getRunningTasks([taskId]).length);
}
}

View File

@ -43,6 +43,7 @@ import {
removeTasksFromTaskGraph,
shouldStreamOutput,
} from './utils';
import { SharedRunningTask } from './running-tasks/shared-running-task';
export class TaskOrchestrator {
private taskDetails: TaskDetails | null = getTaskDetails();
@ -67,6 +68,8 @@ export class TaskOrchestrator {
);
private reverseTaskDeps = calculateReverseDeps(this.taskGraph);
private initializingTaskIds = new Set(this.initiatingTasks.map((t) => t.id));
private processedTasks = new Map<string, Promise<NodeJS.ProcessEnv>>();
private processedBatches = new Map<Batch, Promise<void>>();
@ -81,13 +84,12 @@ export class TaskOrchestrator {
private runningContinuousTasks = new Map<string, RunningTask>();
private cleaningUp = false;
// endregion internal state
constructor(
private readonly hasher: TaskHasher,
private readonly initiatingProject: string | undefined,
private readonly initiatingTasks: Task[],
private readonly projectGraph: ProjectGraph,
private readonly taskGraph: TaskGraph,
private readonly nxJson: NxJsonConfiguration,
@ -652,16 +654,24 @@ export class TaskOrchestrator {
this.options.lifeCycle.setTaskStatus(task.id, NativeTaskStatus.Shared);
}
const runningTask = new SharedRunningTask(
this.runningTasksService,
task.id
);
this.runningContinuousTasks.set(task.id, runningTask);
runningTask.onExit(() => {
this.runningContinuousTasks.delete(task.id);
});
// task is already running by another process, we schedule the next tasks
// and release the threads
await this.scheduleNextTasksAndReleaseThreads();
// wait for the running task to finish
do {
console.log(`Waiting for ${task.id} in another nx process`);
await new Promise((resolve) => setTimeout(resolve, 100));
} while (this.runningTasksService.getRunningTasks([task.id]).length);
return;
if (this.initializingTaskIds.has(task.id)) {
// Hold the thread forever
await new Promise(() => {});
}
return runningTask;
}
const taskSpecificEnv = await this.processedTasks.get(task.id);
@ -708,15 +718,12 @@ export class TaskOrchestrator {
childProcess.onExit(() => {
this.runningTasksService.removeRunningTask(task.id);
this.runningContinuousTasks.delete(task.id);
});
if (
this.initiatingProject === task.target.project &&
this.options.targets.length === 1 &&
this.options.targets[0] === task.target.target
) {
await childProcess.getResults();
} else {
await this.scheduleNextTasksAndReleaseThreads();
if (this.initializingTaskIds.has(task.id)) {
// Hold the thread forever
await new Promise(() => {});
}
return childProcess;
@ -832,6 +839,8 @@ export class TaskOrchestrator {
) {
this.tasksSchedule.complete(taskResults.map(({ taskId }) => taskId));
this.cleanUpUnneededContinuousTasks();
for (const { taskId, status } of taskResults) {
if (this.completedTasks[taskId] === undefined) {
this.completedTasks[taskId] = status;
@ -914,11 +923,14 @@ export class TaskOrchestrator {
// endregion utils
private async cleanup() {
this.cleaningUp = true;
await Promise.all(
Array.from(this.runningContinuousTasks).map(async ([taskId, t]) => {
try {
return t.kill();
await t.kill();
this.options.lifeCycle.setTaskStatus(
taskId,
NativeTaskStatus.Stopped
);
} catch (e) {
console.error(`Unable to terminate ${taskId}\nError:`, e);
} finally {
@ -927,6 +939,31 @@ export class TaskOrchestrator {
})
);
}
private cleanUpUnneededContinuousTasks() {
const incompleteTasks = this.tasksSchedule.getIncompleteTasks();
const neededContinuousTasks = new Set(this.initializingTaskIds);
for (const task of incompleteTasks) {
const continuousDependencies =
this.taskGraph.continuousDependencies[task.id];
for (const continuousDependency of continuousDependencies) {
neededContinuousTasks.add(continuousDependency);
}
}
for (const taskId of this.runningContinuousTasks.keys()) {
if (!neededContinuousTasks.has(taskId)) {
const runningTask = this.runningContinuousTasks.get(taskId);
if (runningTask) {
runningTask.kill();
this.options.lifeCycle.setTaskStatus(
taskId,
NativeTaskStatus.Stopped
);
}
}
}
}
}
export function getThreadCount(

View File

@ -23,6 +23,7 @@ export type TasksRunner<T = unknown> = (
context?: {
target?: string;
initiatingProject?: string | null;
initiatingTasks: Task[];
projectGraph: ProjectGraph;
nxJson: NxJsonConfiguration;
nxArgs: NxArgs;

View File

@ -101,6 +101,16 @@ export class TasksSchedule {
: null;
}
public getIncompleteTasks(): Task[] {
const incompleteTasks: Task[] = [];
for (const taskId in this.taskGraph.tasks) {
if (!this.completedTasks.has(taskId)) {
incompleteTasks.push(this.taskGraph.tasks[taskId]);
}
}
return incompleteTasks;
}
private async scheduleTasks() {
if (this.options.batch || process.env.NX_BATCH_MODE === 'true') {
await this.scheduleBatches();