fix(core): throw errors when task graph has invalid continuous tasks (#30924)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
<!-- This is the behavior we have today -->

1. The continuous property is not present in the nx.json schema.
2. When tasks which do not support parallelism depend on a continuous
task, task execution is deadlocked.
3. Circular dependencies between continuous dependencies are allowed.

## Expected Behavior
<!-- This is the behavior we should expect with the changes in this PR
-->

1. The continuous property is added to the nx.json schema.
2. An error is thrown when tasks which do not support parallelism
depends on a continuous task.
3. Circular dependencies between continuous tasks are caught and thrown
as an error.

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
This commit is contained in:
Jason Jean 2025-04-29 18:51:52 -04:00 committed by GitHub
parent 15d20f925c
commit 84c4dc55d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 118 additions and 18 deletions

View File

@ -543,6 +543,11 @@
"type": "object" "type": "object"
} }
}, },
"continuous": {
"type": "boolean",
"default": false,
"description": "Whether this target runs continuously until stopped"
},
"parallelism": { "parallelism": {
"type": "boolean", "type": "boolean",
"default": true, "default": true,

View File

@ -23,7 +23,6 @@ import {
} from '../project-graph/plugins/tasks-execution-hooks'; } from '../project-graph/plugins/tasks-execution-hooks';
import { createProjectGraphAsync } from '../project-graph/project-graph'; import { createProjectGraphAsync } from '../project-graph/project-graph';
import { NxArgs } from '../utils/command-line-utils'; import { NxArgs } from '../utils/command-line-utils';
import { isRelativePath } from '../utils/fileutils';
import { handleErrors } from '../utils/handle-errors'; import { handleErrors } from '../utils/handle-errors';
import { isCI } from '../utils/is-ci'; import { isCI } from '../utils/is-ci';
import { isNxCloudUsed } from '../utils/nx-cloud-utils'; import { isNxCloudUsed } from '../utils/nx-cloud-utils';
@ -58,6 +57,7 @@ import { TaskResultsLifeCycle } from './life-cycles/task-results-life-cycle';
import { TaskTimingsLifeCycle } from './life-cycles/task-timings-life-cycle'; import { TaskTimingsLifeCycle } from './life-cycles/task-timings-life-cycle';
import { getTuiTerminalSummaryLifeCycle } from './life-cycles/tui-summary-life-cycle'; import { getTuiTerminalSummaryLifeCycle } from './life-cycles/tui-summary-life-cycle';
import { import {
assertTaskGraphDoesNotContainInvalidTargets,
findCycle, findCycle,
makeAcyclic, makeAcyclic,
validateNoAtomizedTasks, validateNoAtomizedTasks,
@ -358,6 +358,8 @@ function createTaskGraphAndRunValidations(
extraOptions.excludeTaskDependencies extraOptions.excludeTaskDependencies
); );
assertTaskGraphDoesNotContainInvalidTargets(taskGraph);
const cycle = findCycle(taskGraph); const cycle = findCycle(taskGraph);
if (cycle) { if (cycle) {
if (process.env.NX_IGNORE_CYCLES === 'true' || nxArgs.nxIgnoreCycles) { if (process.env.NX_IGNORE_CYCLES === 'true' || nxArgs.nxIgnoreCycles) {

View File

@ -1,7 +1,5 @@
import '../internal-testing-utils/mock-fs'; import '../internal-testing-utils/mock-fs';
import { vol } from 'memfs';
import { import {
findCycle, findCycle,
findCycles, findCycles,
@ -22,7 +20,7 @@ describe('task graph utils', () => {
e: ['q', 'a'], e: ['q', 'a'],
q: [], q: [],
}, },
} as any) })
).toEqual(['a', 'c', 'e', 'a']); ).toEqual(['a', 'c', 'e', 'a']);
expect( expect(
@ -36,10 +34,56 @@ describe('task graph utils', () => {
f: ['q'], f: ['q'],
q: ['e'], q: ['e'],
}, },
} as any) })
).toEqual(['a', 'c', 'a']); ).toEqual(['a', 'c', 'a']);
}); });
it('should return a continuous cycle is there', () => {
expect(
findCycle({
dependencies: {
a: [],
b: [],
c: [],
d: [],
e: [],
q: [],
},
continuousDependencies: {
a: ['b', 'c'],
b: ['d'],
c: ['e'],
d: [],
e: ['q', 'a'],
q: [],
},
})
).toEqual(['a', 'c', 'e', 'a']);
expect(
findCycle({
dependencies: {
a: ['b'],
b: [],
c: [],
d: [],
e: [],
f: [],
q: [],
},
continuousDependencies: {
a: [],
b: ['a'],
c: [],
d: [],
e: [],
f: [],
q: [],
},
})
).toEqual(['a', 'b', 'a']);
});
it('should return null when no cycle', () => { it('should return null when no cycle', () => {
expect( expect(
findCycle({ findCycle({
@ -51,7 +95,7 @@ describe('task graph utils', () => {
e: ['q'], e: ['q'],
q: [], q: [],
}, },
} as any) })
).toEqual(null); ).toEqual(null);
}); });
}); });
@ -68,7 +112,7 @@ describe('task graph utils', () => {
e: ['q', 'a'], e: ['q', 'a'],
q: [], q: [],
}, },
} as any) })
).toEqual(new Set(['a', 'c', 'e'])); ).toEqual(new Set(['a', 'c', 'e']));
expect( expect(
@ -82,7 +126,7 @@ describe('task graph utils', () => {
f: ['q'], f: ['q'],
q: ['e'], q: ['e'],
}, },
} as any) })
).toEqual(new Set(['a', 'c', 'e', 'f', 'q'])); ).toEqual(new Set(['a', 'c', 'e', 'f', 'q']));
expect( expect(
findCycles({ findCycles({
@ -95,7 +139,7 @@ describe('task graph utils', () => {
f: ['q'], f: ['q'],
q: ['c'], q: ['c'],
}, },
} as any) })
).toEqual(new Set(['a', 'b', 'd', 'c', 'f', 'q'])); ).toEqual(new Set(['a', 'b', 'd', 'c', 'f', 'q']));
}); });
@ -110,7 +154,7 @@ describe('task graph utils', () => {
e: ['q'], e: ['q'],
q: [], q: [],
}, },
} as any) })
).toEqual(null); ).toEqual(null);
}); });
}); });
@ -126,7 +170,7 @@ describe('task graph utils', () => {
d: [], d: [],
e: ['a'], e: ['a'],
}, },
} as any; };
makeAcyclic(graph); makeAcyclic(graph);
expect(graph.dependencies).toEqual({ expect(graph.dependencies).toEqual({
@ -151,13 +195,12 @@ describe('task graph utils', () => {
mockProcessExit = jest mockProcessExit = jest
.spyOn(process, 'exit') .spyOn(process, 'exit')
.mockImplementation((code: number) => { .mockImplementation((code: number) => {
return undefined as any as never; return undefined as never;
}); });
}); });
afterEach(() => { afterEach(() => {
process.env = env; process.env = env;
vol.reset();
mockProcessExit.mockRestore(); mockProcessExit.mockRestore();
}); });

View File

@ -1,9 +1,12 @@
import { ProjectGraph } from '../config/project-graph'; import { ProjectGraph } from '../config/project-graph';
import { TaskGraph } from '../config/task-graph'; import { Task, TaskGraph } from '../config/task-graph';
import { output } from '../utils/output'; import { output } from '../utils/output';
function _findCycle( function _findCycle(
graph: { dependencies: Record<string, string[]> }, graph: {
dependencies: Record<string, string[]>;
continuousDependencies?: Record<string, string[]>;
},
id: string, id: string,
visited: { [taskId: string]: boolean }, visited: { [taskId: string]: boolean },
path: string[] path: string[]
@ -11,7 +14,10 @@ function _findCycle(
if (visited[id]) return null; if (visited[id]) return null;
visited[id] = true; visited[id] = true;
for (const d of graph.dependencies[id]) { for (const d of [
...graph.dependencies[id],
...(graph.continuousDependencies?.[id] ?? []),
]) {
if (path.includes(d)) return [...path, d]; if (path.includes(d)) return [...path, d];
const cycle = _findCycle(graph, d, visited, [...path, d]); const cycle = _findCycle(graph, d, visited, [...path, d]);
if (cycle) return cycle; if (cycle) return cycle;
@ -25,6 +31,7 @@ function _findCycle(
*/ */
export function findCycle(graph: { export function findCycle(graph: {
dependencies: Record<string, string[]>; dependencies: Record<string, string[]>;
continuousDependencies?: Record<string, string[]>;
}): string[] | null { }): string[] | null {
const visited = {}; const visited = {};
for (const t of Object.keys(graph.dependencies)) { for (const t of Object.keys(graph.dependencies)) {
@ -45,6 +52,7 @@ export function findCycle(graph: {
*/ */
export function findCycles(graph: { export function findCycles(graph: {
dependencies: Record<string, string[]>; dependencies: Record<string, string[]>;
continuousDependencies?: Record<string, string[]>;
}): Set<string> | null { }): Set<string> | null {
const visited = {}; const visited = {};
const cycles = new Set<string>(); const cycles = new Set<string>();
@ -63,7 +71,10 @@ export function findCycles(graph: {
} }
function _makeAcyclic( function _makeAcyclic(
graph: { dependencies: Record<string, string[]> }, graph: {
dependencies: Record<string, string[]>;
continuousDependencies?: Record<string, string[]>;
},
id: string, id: string,
visited: { [taskId: string]: boolean }, visited: { [taskId: string]: boolean },
path: string[] path: string[]
@ -72,9 +83,11 @@ function _makeAcyclic(
visited[id] = true; visited[id] = true;
const deps = graph.dependencies[id]; const deps = graph.dependencies[id];
for (const d of [...deps]) { const continuousDeps = graph.continuousDependencies?.[id] ?? [];
for (const d of [...deps, ...continuousDeps]) {
if (path.includes(d)) { if (path.includes(d)) {
deps.splice(deps.indexOf(d), 1); deps.splice(deps.indexOf(d), 1);
continuousDeps.splice(continuousDeps.indexOf(d), 1);
} else { } else {
_makeAcyclic(graph, d, visited, [...path, d]); _makeAcyclic(graph, d, visited, [...path, d]);
} }
@ -142,3 +155,40 @@ export function validateNoAtomizedTasks(
} }
process.exit(1); process.exit(1);
} }
export function assertTaskGraphDoesNotContainInvalidTargets(
taskGraph: TaskGraph
) {
const invalidTasks = [];
for (const task of Object.values(taskGraph.tasks)) {
if (
task.parallelism === false &&
taskGraph.continuousDependencies[task.id].length > 0
) {
invalidTasks.push(task);
}
}
if (invalidTasks.length > 0) {
throw new NonParallelTaskDependsOnContinuousTasksError(
invalidTasks,
taskGraph
);
}
}
class NonParallelTaskDependsOnContinuousTasksError extends Error {
constructor(public invalidTasks: Task[], taskGraph: TaskGraph) {
let message =
'The following tasks do not support parallelism but depend on continuous tasks:';
for (const task of invalidTasks) {
message += `\n - ${task.id} -> ${taskGraph.continuousDependencies[
task.id
].join(', ')}`;
}
super(message);
this.name = 'NonParallelTaskDependsOnContinuousTasksError';
}
}