feat(core): add lifecycle to record task history & retrieve via daemon (#26593)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
<!-- This is the behavior we have today -->

## Expected Behavior
<!-- This is the behavior we should expect with the changes in this PR
-->

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
This commit is contained in:
MaxKless 2024-06-24 16:22:15 +02:00 committed by GitHub
parent 65e0cb90de
commit dece9afc0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 318 additions and 21 deletions

View File

@ -45,6 +45,11 @@ import {
} from '../message-types/get-files-in-directory'; } from '../message-types/get-files-in-directory';
import { HASH_GLOB, HandleHashGlobMessage } from '../message-types/hash-glob'; import { HASH_GLOB, HandleHashGlobMessage } from '../message-types/hash-glob';
import { NxWorkspaceFiles } from '../../native'; import { NxWorkspaceFiles } from '../../native';
import { TaskRun } from '../../utils/task-history';
import {
HandleGetTaskHistoryForHashesMessage,
HandleWriteTaskRunsToHistoryMessage,
} from '../message-types/task-history';
const DAEMON_ENV_SETTINGS = { const DAEMON_ENV_SETTINGS = {
NX_PROJECT_GLOB_CACHE: 'false', NX_PROJECT_GLOB_CACHE: 'false',
@ -312,6 +317,25 @@ export class DaemonClient {
return this.sendToDaemonViaQueue(message); return this.sendToDaemonViaQueue(message);
} }
getTaskHistoryForHashes(hashes: string[]): Promise<{
[hash: string]: TaskRun[];
}> {
const message: HandleGetTaskHistoryForHashesMessage = {
type: 'GET_TASK_HISTORY_FOR_HASHES',
hashes,
};
return this.sendToDaemonViaQueue(message);
}
writeTaskRunsToHistory(taskRuns: TaskRun[]): Promise<void> {
const message: HandleWriteTaskRunsToHistoryMessage = {
type: 'WRITE_TASK_RUNS_TO_HISTORY',
taskRuns,
};
return this.sendMessageToDaemon(message);
}
async isServerAvailable(): Promise<boolean> { async isServerAvailable(): Promise<boolean> {
return new Promise((resolve) => { return new Promise((resolve) => {
try { try {

View File

@ -0,0 +1,38 @@
import { TaskRun } from '../../utils/task-history';
export const GET_TASK_HISTORY_FOR_HASHES =
'GET_TASK_HISTORY_FOR_HASHES' as const;
export type HandleGetTaskHistoryForHashesMessage = {
type: typeof GET_TASK_HISTORY_FOR_HASHES;
hashes: string[];
};
export function isHandleGetTaskHistoryForHashesMessage(
message: unknown
): message is HandleGetTaskHistoryForHashesMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === GET_TASK_HISTORY_FOR_HASHES
);
}
export const WRITE_TASK_RUNS_TO_HISTORY = 'WRITE_TASK_RUNS_TO_HISTORY' as const;
export type HandleWriteTaskRunsToHistoryMessage = {
type: typeof WRITE_TASK_RUNS_TO_HISTORY;
taskRuns: TaskRun[];
};
export function isHandleWriteTaskRunsToHistoryMessage(
message: unknown
): message is HandleWriteTaskRunsToHistoryMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === WRITE_TASK_RUNS_TO_HISTORY
);
}

View File

@ -0,0 +1,9 @@
import { getHistoryForHashes } from '../../utils/task-history';
export async function handleGetTaskHistoryForHashes(hashes: string[]) {
const history = await getHistoryForHashes(hashes);
return {
response: JSON.stringify(history),
description: 'handleGetTaskHistoryForHashes',
};
}

View File

@ -0,0 +1,9 @@
import { TaskRun, writeTaskRunsToHistory } from '../../utils/task-history';
export async function handleWriteTaskRunsToHistory(taskRuns: TaskRun[]) {
await writeTaskRunsToHistory(taskRuns);
return {
response: 'true',
description: 'handleWriteTaskRunsToHistory',
};
}

View File

@ -70,6 +70,12 @@ import {
import { handleGetFilesInDirectory } from './handle-get-files-in-directory'; import { handleGetFilesInDirectory } from './handle-get-files-in-directory';
import { HASH_GLOB, isHandleHashGlobMessage } from '../message-types/hash-glob'; import { HASH_GLOB, isHandleHashGlobMessage } from '../message-types/hash-glob';
import { handleHashGlob } from './handle-hash-glob'; import { handleHashGlob } from './handle-hash-glob';
import {
isHandleGetTaskHistoryForHashesMessage,
isHandleWriteTaskRunsToHistoryMessage,
} from '../message-types/task-history';
import { handleGetTaskHistoryForHashes } from './handle-get-task-history';
import { handleWriteTaskRunsToHistory } from './handle-write-task-runs-to-history';
let performanceObserver: PerformanceObserver | undefined; let performanceObserver: PerformanceObserver | undefined;
let workspaceWatcherError: Error | undefined; let workspaceWatcherError: Error | undefined;
@ -202,6 +208,14 @@ async function handleMessage(socket, data: string) {
await handleResult(socket, HASH_GLOB, () => await handleResult(socket, HASH_GLOB, () =>
handleHashGlob(payload.globs, payload.exclude) handleHashGlob(payload.globs, payload.exclude)
); );
} else if (isHandleGetTaskHistoryForHashesMessage(payload)) {
await handleResult(socket, 'GET_TASK_HISTORY_FOR_HASHES', () =>
handleGetTaskHistoryForHashes(payload.hashes)
);
} else if (isHandleWriteTaskRunsToHistoryMessage(payload)) {
await handleResult(socket, 'WRITE_TASK_RUNS_TO_HISTORY', () =>
handleWriteTaskRunsToHistory(payload.taskRuns)
);
} else { } else {
await respondWithErrorAndExit( await respondWithErrorAndExit(
socket, socket,

View File

@ -56,11 +56,11 @@ export const defaultTasksRunner: TasksRunner<
(options as any)['parallel'] = Number((options as any)['maxParallel'] || 3); (options as any)['parallel'] = Number((options as any)['maxParallel'] || 3);
} }
options.lifeCycle.startCommand(); await options.lifeCycle.startCommand();
try { try {
return await runAllTasks(tasks, options, context); return await runAllTasks(tasks, options, context);
} finally { } finally {
options.lifeCycle.endCommand(); await options.lifeCycle.endCommand();
} }
}; };

View File

@ -13,11 +13,11 @@ export interface TaskMetadata {
} }
export interface LifeCycle { export interface LifeCycle {
startCommand?(): void; startCommand?(): void | Promise<void>;
endCommand?(): void; endCommand?(): void | Promise<void>;
scheduleTask?(task: Task): void; scheduleTask?(task: Task): void | Promise<void>;
/** /**
* @deprecated use startTasks * @deprecated use startTasks
@ -33,9 +33,12 @@ export interface LifeCycle {
*/ */
endTask?(task: Task, code: number): void; endTask?(task: Task, code: number): void;
startTasks?(task: Task[], metadata: TaskMetadata): void; startTasks?(task: Task[], metadata: TaskMetadata): void | Promise<void>;
endTasks?(taskResults: TaskResult[], metadata: TaskMetadata): void; endTasks?(
taskResults: TaskResult[],
metadata: TaskMetadata
): void | Promise<void>;
printTaskTerminalOutput?( printTaskTerminalOutput?(
task: Task, task: Task,
@ -47,26 +50,26 @@ export interface LifeCycle {
export class CompositeLifeCycle implements LifeCycle { export class CompositeLifeCycle implements LifeCycle {
constructor(private readonly lifeCycles: LifeCycle[]) {} constructor(private readonly lifeCycles: LifeCycle[]) {}
startCommand(): void { async startCommand(): Promise<void> {
for (let l of this.lifeCycles) { for (let l of this.lifeCycles) {
if (l.startCommand) { if (l.startCommand) {
l.startCommand(); await l.startCommand();
} }
} }
} }
endCommand(): void { async endCommand(): Promise<void> {
for (let l of this.lifeCycles) { for (let l of this.lifeCycles) {
if (l.endCommand) { if (l.endCommand) {
l.endCommand(); await l.endCommand();
} }
} }
} }
scheduleTask(task: Task): void { async scheduleTask(task: Task): Promise<void> {
for (let l of this.lifeCycles) { for (let l of this.lifeCycles) {
if (l.scheduleTask) { if (l.scheduleTask) {
l.scheduleTask(task); await l.scheduleTask(task);
} }
} }
} }
@ -87,20 +90,23 @@ export class CompositeLifeCycle implements LifeCycle {
} }
} }
startTasks(tasks: Task[], metadata: TaskMetadata): void { async startTasks(tasks: Task[], metadata: TaskMetadata): Promise<void> {
for (let l of this.lifeCycles) { for (let l of this.lifeCycles) {
if (l.startTasks) { if (l.startTasks) {
l.startTasks(tasks, metadata); await l.startTasks(tasks, metadata);
} else if (l.startTask) { } else if (l.startTask) {
tasks.forEach((t) => l.startTask(t)); tasks.forEach((t) => l.startTask(t));
} }
} }
} }
endTasks(taskResults: TaskResult[], metadata: TaskMetadata): void { async endTasks(
taskResults: TaskResult[],
metadata: TaskMetadata
): Promise<void> {
for (let l of this.lifeCycles) { for (let l of this.lifeCycles) {
if (l.endTasks) { if (l.endTasks) {
l.endTasks(taskResults, metadata); await l.endTasks(taskResults, metadata);
} else if (l.endTask) { } else if (l.endTask) {
taskResults.forEach((t) => l.endTask(t.task, t.code)); taskResults.forEach((t) => l.endTask(t.task, t.code));
} }

View File

@ -0,0 +1,71 @@
import { serializeTarget } from '../../utils/serialize-target';
import { Task } from '../../config/task-graph';
import { output } from '../../utils/output';
import {
getHistoryForHashes,
TaskRun,
writeTaskRunsToHistory as writeTaskRunsToHistory,
} from '../../utils/task-history';
import { LifeCycle, TaskResult } from '../life-cycle';
export class TaskHistoryLifeCycle implements LifeCycle {
private startTimings: Record<string, number> = {};
private taskRuns: TaskRun[] = [];
startTasks(tasks: Task[]): void {
for (let task of tasks) {
this.startTimings[task.id] = new Date().getTime();
}
}
async endTasks(taskResults: TaskResult[]) {
const taskRuns: TaskRun[] = taskResults.map((taskResult) => ({
project: taskResult.task.target.project,
target: taskResult.task.target.target,
configuration: taskResult.task.target.configuration,
hash: taskResult.task.hash,
code: taskResult.code.toString(),
status: taskResult.status,
start: (
taskResult.task.startTime ?? this.startTimings[taskResult.task.id]
).toString(),
end: (taskResult.task.endTime ?? new Date().getTime()).toString(),
}));
this.taskRuns.push(...taskRuns);
}
async endCommand() {
await writeTaskRunsToHistory(this.taskRuns);
const history = await getHistoryForHashes(this.taskRuns.map((t) => t.hash));
const flakyTasks: string[] = [];
// check if any hash has different exit codes => flaky
for (let hash in history) {
if (
history[hash].length > 1 &&
history[hash].some((run) => run.code !== history[hash][0].code)
) {
flakyTasks.push(
serializeTarget(
history[hash][0].project,
history[hash][0].target,
history[hash][0].configuration
)
);
}
}
if (flakyTasks.length > 0) {
output.warn({
title: `Nx detected ${
flakyTasks.length === 1 ? 'a flaky task' : ' flaky tasks'
}`,
bodyLines: [
,
...flakyTasks.map((t) => ` ${t}`),
'',
`Flaky tasks can disrupt your CI pipeline. Automatically retry them with Nx Cloud. Learn more at https://nx.dev/ci/features/flaky-tasks`,
],
});
}
}
}

View File

@ -16,6 +16,7 @@ import { createRunOneDynamicOutputRenderer } from './life-cycles/dynamic-run-one
import { ProjectGraph, ProjectGraphProjectNode } from '../config/project-graph'; import { ProjectGraph, ProjectGraphProjectNode } from '../config/project-graph';
import { import {
NxJsonConfiguration, NxJsonConfiguration,
readNxJson,
TargetDefaults, TargetDefaults,
TargetDependencies, TargetDependencies,
} from '../config/nx-json'; } from '../config/nx-json';
@ -28,6 +29,8 @@ import { hashTasksThatDoNotDependOnOutputsOfOtherTasks } from '../hasher/hash-ta
import { daemonClient } from '../daemon/client/client'; import { daemonClient } from '../daemon/client/client';
import { StoreRunInformationLifeCycle } from './life-cycles/store-run-information-life-cycle'; import { StoreRunInformationLifeCycle } from './life-cycles/store-run-information-life-cycle';
import { createTaskHasher } from '../hasher/create-task-hasher'; import { createTaskHasher } from '../hasher/create-task-hasher';
import { TaskHistoryLifeCycle } from './life-cycles/task-history-life-cycle';
import { isNxCloudUsed } from '../utils/nx-cloud-utils';
async function getTerminalOutputLifeCycle( async function getTerminalOutputLifeCycle(
initiatingProject: string, initiatingProject: string,
@ -325,6 +328,9 @@ function constructLifeCycles(lifeCycle: LifeCycle) {
if (process.env.NX_PROFILE) { if (process.env.NX_PROFILE) {
lifeCycles.push(new TaskProfilingLifeCycle(process.env.NX_PROFILE)); lifeCycles.push(new TaskProfilingLifeCycle(process.env.NX_PROFILE));
} }
if (!isNxCloudUsed(readNxJson())) {
lifeCycles.push(new TaskHistoryLifeCycle());
}
return lifeCycles; return lifeCycles;
} }

View File

@ -159,7 +159,7 @@ export class TaskOrchestrator {
); );
} }
this.options.lifeCycle.scheduleTask(task); await this.options.lifeCycle.scheduleTask(task);
return taskSpecificEnv; return taskSpecificEnv;
} }
@ -176,7 +176,7 @@ export class TaskOrchestrator {
this.batchEnv this.batchEnv
); );
} }
this.options.lifeCycle.scheduleTask(task); await this.options.lifeCycle.scheduleTask(task);
}) })
); );
} }
@ -520,7 +520,7 @@ export class TaskOrchestrator {
// region Lifecycle // region Lifecycle
private async preRunSteps(tasks: Task[], metadata: TaskMetadata) { private async preRunSteps(tasks: Task[], metadata: TaskMetadata) {
this.options.lifeCycle.startTasks(tasks, metadata); await this.options.lifeCycle.startTasks(tasks, metadata);
} }
private async postRunSteps( private async postRunSteps(
@ -573,7 +573,7 @@ export class TaskOrchestrator {
'cache-results-end' 'cache-results-end'
); );
} }
this.options.lifeCycle.endTasks( await this.options.lifeCycle.endTasks(
results.map((result) => { results.map((result) => {
const code = const code =
result.status === 'success' || result.status === 'success' ||

View File

@ -0,0 +1,3 @@
export function serializeTarget(project, target, configuration) {
return [project, target, configuration].filter((part) => !!part).join(':');
}

View File

@ -0,0 +1,114 @@
import { appendFileSync, existsSync, readFileSync, writeFileSync } from 'fs';
import { join } from 'path';
import { daemonClient } from '../daemon/client/client';
import { isOnDaemon } from '../daemon/is-on-daemon';
import { workspaceDataDirectory } from './cache-directory';
const taskRunKeys = [
'project',
'target',
'configuration',
'hash',
'code',
'status',
'start',
'end',
] as const;
export type TaskRun = Record<(typeof taskRunKeys)[number], string>;
let taskHistory: TaskRun[] | undefined = undefined;
let taskHashToIndicesMap: Map<string, number[]> = new Map();
export async function getHistoryForHashes(hashes: string[]): Promise<{
[hash: string]: TaskRun[];
}> {
if (isOnDaemon() || !daemonClient.enabled()) {
if (taskHistory === undefined) {
loadTaskHistoryFromDisk();
}
const result: { [hash: string]: TaskRun[] } = {};
for (let hash of hashes) {
const indices = taskHashToIndicesMap.get(hash);
if (!indices) {
result[hash] = [];
} else {
result[hash] = indices.map((index) => taskHistory[index]);
}
}
return result;
}
return await daemonClient.getTaskHistoryForHashes(hashes);
}
export async function writeTaskRunsToHistory(
taskRuns: TaskRun[]
): Promise<void> {
if (isOnDaemon() || !daemonClient.enabled()) {
if (taskHistory === undefined) {
loadTaskHistoryFromDisk();
}
const serializedLines: string[] = [];
for (let taskRun of taskRuns) {
const serializedLine = taskRunKeys.map((key) => taskRun[key]).join(',');
serializedLines.push(serializedLine);
recordTaskRunInMemory(taskRun);
}
if (!existsSync(taskHistoryFile)) {
writeFileSync(taskHistoryFile, `${taskRunKeys.join(',')}\n`);
}
appendFileSync(taskHistoryFile, serializedLines.join('\n') + '\n');
} else {
await daemonClient.writeTaskRunsToHistory(taskRuns);
}
}
export const taskHistoryFile = join(workspaceDataDirectory, 'task-history.csv');
function loadTaskHistoryFromDisk() {
taskHashToIndicesMap.clear();
taskHistory = [];
if (!existsSync(taskHistoryFile)) {
return;
}
const fileContent = readFileSync(taskHistoryFile, 'utf8');
if (!fileContent) {
return;
}
const lines = fileContent.split('\n');
// if there are no lines or just the header, return
if (lines.length <= 1) {
return;
}
const contentLines = lines.slice(1).filter((l) => l.trim() !== '');
// read the values from csv format where each header is a key and the value is the value
for (let line of contentLines) {
const values = line.trim().split(',');
const run: Partial<TaskRun> = {};
taskRunKeys.forEach((header, index) => {
run[header] = values[index];
});
recordTaskRunInMemory(run as TaskRun);
}
}
function recordTaskRunInMemory(taskRun: TaskRun) {
const index = taskHistory.push(taskRun) - 1;
if (taskHashToIndicesMap.has(taskRun.hash)) {
taskHashToIndicesMap.get(taskRun.hash).push(index);
} else {
taskHashToIndicesMap.set(taskRun.hash, [index]);
}
}

View File

@ -22,6 +22,9 @@ export function editTarget(targetString: string, callback) {
return serializeTarget(callback(parsedTarget)); return serializeTarget(callback(parsedTarget));
} }
/**
* @deprecated use the utility from nx/src/utils instead
*/
export function serializeTarget({ project, target, config }) { export function serializeTarget({ project, target, config }) {
return [project, target, config].filter((part) => !!part).join(':'); return [project, target, config].filter((part) => !!part).join(':');
} }