feat(core): add daemon to the context provided to task runners

This commit is contained in:
Victor Savkin 2022-08-17 14:45:02 -04:00
parent 34fdbc0de2
commit 1a31018ea2
12 changed files with 247 additions and 184 deletions

View File

@ -942,6 +942,7 @@ stored in the daemon process. To reset both run: `nx reset`.
| `tasks` | [`Task`](../../devkit/index#task)[] |
| `options` | [`DefaultTasksRunnerOptions`](../../devkit/index#defaulttasksrunneroptions) |
| `context?` | `Object` |
| `context.daemon?` | `DaemonClient` |
| `context.hasher?` | [`Hasher`](../../devkit/index#hasher) |
| `context.initiatingProject?` | `string` |
| `context.nxArgs` | `NxArgs` |

File diff suppressed because one or more lines are too long

View File

@ -63,7 +63,7 @@
"@nrwl/eslint-plugin-nx": "14.6.0-beta.3",
"@nrwl/jest": "14.6.0-beta.3",
"@nrwl/next": "14.6.0-beta.3",
"@nrwl/nx-cloud": "14.4.1",
"@nrwl/nx-cloud": "14.4.2-beta.2",
"@nrwl/react": "14.6.0-beta.3",
"@nrwl/web": "14.6.0-beta.3",
"@parcel/watcher": "2.0.4",

View File

@ -1,6 +1,6 @@
import { workspaceRoot } from '../../utils/workspace-root';
import { ChildProcess, spawn, spawnSync } from 'child_process';
import { openSync, readFileSync } from 'fs';
import { openSync, readFileSync, statSync } from 'fs';
import { ensureDirSync, ensureFileSync } from 'fs-extra';
import { connect } from 'net';
import { join } from 'path';
@ -14,8 +14,12 @@ import { FULL_OS_SOCKET_PATH, killSocketOrPath } from '../socket-utils';
import {
DAEMON_DIR_FOR_CURRENT_WORKSPACE,
DAEMON_OUTPUT_LOG_FILE,
isDaemonDisabled,
} from '../tmp-dir';
import { ProjectGraph } from '../../config/project-graph';
import { isCI } from '../../utils/is-ci';
import { readNxJson } from '../../config/configuration';
import { NxJsonConfiguration } from 'nx/src/config/nx-json';
const DAEMON_ENV_SETTINGS = {
...process.env,
@ -23,6 +27,64 @@ const DAEMON_ENV_SETTINGS = {
NX_CACHE_WORKSPACE_CONFIG: 'false',
};
export class DaemonClient {
constructor(private readonly nxJson: NxJsonConfiguration) {}
enabled() {
const useDaemonProcessOption =
this.nxJson.tasksRunnerOptions?.['default']?.options?.useDaemonProcess;
const env = process.env.NX_DAEMON;
// env takes precedence
// option=true,env=false => no daemon
// option=false,env=undefined => no daemon
// option=false,env=false => no daemon
// option=undefined,env=undefined => daemon
// option=true,env=true => daemon
// option=false,env=true => daemon
if (
isCI() ||
isDocker() ||
isDaemonDisabled() ||
(useDaemonProcessOption === undefined && env === 'false') ||
(useDaemonProcessOption === true && env === 'false') ||
(useDaemonProcessOption === false && env === undefined) ||
(useDaemonProcessOption === false && env === 'false')
) {
return false;
}
return true;
}
async getProjectGraph(): Promise<ProjectGraph> {
if (!(await isServerAvailable())) {
await startInBackground();
}
return sendMessageToDaemon({ type: 'REQUEST_PROJECT_GRAPH' });
}
async processInBackground(requirePath: string, data: any): Promise<any> {
if (!(await isServerAvailable())) {
await startInBackground();
}
return sendMessageToDaemon({
type: 'PROCESS_IN_BACKGROUND',
requirePath,
data,
});
}
}
function isDocker() {
try {
statSync('/.dockerenv');
return true;
} catch {
return false;
}
}
export async function startInBackground(): Promise<ChildProcess['pid']> {
await safelyCleanUpExistingProcess();
ensureDirSync(DAEMON_DIR_FOR_CURRENT_WORKSPACE);
@ -136,19 +198,13 @@ export async function isServerAvailable(): Promise<boolean> {
});
}
/**
* Establishes a client connection to the daemon server for use in project graph
* creation utilities.
*
* All logs are performed by the devkit logger because this logic does not
* run "on the server" per se and therefore does not write to its log output.
*
* TODO: Gracefully handle a server shutdown (for whatever reason) while a client
* is connecting and querying it.
*/
export async function getProjectGraphFromServer(): Promise<ProjectGraph> {
async function sendMessageToDaemon(message: {
type: string;
requirePath?: string;
data?: any;
}): Promise<any> {
return new Promise((resolve, reject) => {
performance.mark('getProjectGraphFromServer-start');
performance.mark('sendMessageToDaemon-start');
const socket = connect(FULL_OS_SOCKET_PATH);
socket.on('error', (err) => {
@ -156,7 +212,7 @@ export async function getProjectGraphFromServer(): Promise<ProjectGraph> {
return reject(err);
}
if (err.message.startsWith('LOCK-FILES-CHANGED')) {
return getProjectGraphFromServer().then(resolve, reject);
return sendMessageToDaemon(message).then(resolve, reject);
}
let error: any;
if (err.message.startsWith('connect ENOENT')) {
@ -174,54 +230,47 @@ export async function getProjectGraphFromServer(): Promise<ProjectGraph> {
return reject(error || err);
});
/**
* Immediately after connecting to the server we send it the known project graph creation
* request payload. See the notes above createServer() for more context as to why we explicitly
* request the graph from the client like this.
*/
socket.on('connect', () => {
socket.write('REQUEST_PROJECT_GRAPH_PAYLOAD');
socket.write(JSON.stringify(message));
let serializedProjectGraphResult = '';
let serializedResult = '';
socket.on('data', (data) => {
serializedProjectGraphResult += data.toString();
serializedResult += data.toString();
});
socket.on('end', () => {
try {
performance.mark('json-parse-start');
const projectGraphResult = JSON.parse(serializedProjectGraphResult);
const parsedResult = JSON.parse(serializedResult);
performance.mark('json-parse-end');
performance.measure(
'deserialize graph result on the client',
'deserialize daemon response',
'json-parse-start',
'json-parse-end'
);
if (projectGraphResult.error) {
reject(projectGraphResult.error);
if (parsedResult.error) {
reject(parsedResult.error);
} else {
performance.measure(
'total for getProjectGraphFromServer()',
'getProjectGraphFromServer-start',
'total for sendMessageToDaemon()',
'sendMessageToDaemon-start',
'json-parse-end'
);
return resolve(projectGraphResult.projectGraph);
return resolve(parsedResult.projectGraph);
}
} catch (e) {
const endOfGraph =
serializedProjectGraphResult.length > 300
? serializedProjectGraphResult.substring(
serializedProjectGraphResult.length - 300
)
: serializedProjectGraphResult;
const endOfResponse =
serializedResult.length > 300
? serializedResult.substring(serializedResult.length - 300)
: serializedResult;
reject(
daemonProcessException(
[
'Could not deserialize project graph.',
'Could not deserialize response from Nx deamon.',
`Message: ${e.message}`,
'\n',
`Received:`,
endOfGraph,
endOfResponse,
'\n',
].join('\n')
)

View File

@ -0,0 +1,27 @@
import { respondWithErrorAndExit } from './shutdown-utils';
export async function handleProcessInBackground(
socket,
payload: { type: string; requirePath: string; data: any }
) {
let fn;
try {
fn = require(payload.requirePath);
} catch (e) {
await respondWithErrorAndExit(
socket,
`Unable to require ${payload.requirePath}`,
new Error(`Unable to require ${payload.requirePath}`)
);
}
try {
await fn(socket, payload.data);
} catch (e) {
await respondWithErrorAndExit(
socket,
`Error when processing ${payload.type}.`,
new Error(`Error when processing ${payload.type}. Message: ${e.message}`)
);
}
}

View File

@ -0,0 +1,63 @@
import { performance } from 'perf_hooks';
import { serializeResult } from '../socket-utils';
import { serverLogger } from './logger';
import { getCachedSerializedProjectGraphPromise } from './project-graph-incremental-recomputation';
import { respondWithErrorAndExit } from './shutdown-utils';
export async function handleRequestProjectGraph(socket) {
performance.mark('server-connection');
serverLogger.requestLog('Client Request for Project Graph Received');
const result = await getCachedSerializedProjectGraphPromise();
if (result.error) {
await respondWithErrorAndExit(
socket,
`Error when preparing serialized project graph.`,
result.error
);
}
const serializedResult = serializeResult(
result.error,
result.serializedProjectGraph
);
if (!serializedResult) {
await respondWithErrorAndExit(
socket,
`Error when serializing project graph result.`,
new Error(
'Critical error when serializing server result, check server logs'
)
);
}
performance.mark('serialized-project-graph-ready');
performance.measure(
'total for creating and serializing project graph',
'server-connection',
'serialized-project-graph-ready'
);
socket.write(serializedResult, () => {
performance.mark('serialized-project-graph-written-to-client');
performance.measure(
'write project graph to socket',
'serialized-project-graph-ready',
'serialized-project-graph-written-to-client'
);
// Close the connection once all data has been written so that the client knows when to read it.
socket.end();
performance.measure(
'total for server response',
'server-connection',
'serialized-project-graph-written-to-client'
);
const bytesWritten = Buffer.byteLength(
result.serializedProjectGraph,
'utf-8'
);
serverLogger.requestLog(
`Closed Connection to Client (${bytesWritten} bytes transferred)`
);
});
}

View File

@ -1,7 +1,7 @@
import { workspaceRoot } from '../../utils/workspace-root';
import { createServer, Server, Socket } from 'net';
import { join } from 'path';
import { performance, PerformanceObserver } from 'perf_hooks';
import { PerformanceObserver } from 'perf_hooks';
import {
FULL_OS_SOCKET_PATH,
isWindows,
@ -12,6 +12,7 @@ import { serverLogger } from './logger';
import {
handleServerProcessTermination,
resetInactivityTimeout,
respondWithErrorAndExit,
SERVER_INACTIVITY_TIMEOUT_MS,
} from './shutdown-utils';
import {
@ -20,48 +21,17 @@ import {
SubscribeToWorkspaceChangesCallback,
WatcherSubscription,
} from './watcher';
import {
addUpdatedAndDeletedFiles,
getCachedSerializedProjectGraphPromise,
} from './project-graph-incremental-recomputation';
import { addUpdatedAndDeletedFiles } from './project-graph-incremental-recomputation';
import { existsSync, statSync } from 'fs';
import { HashingImpl } from '../../hasher/hashing-impl';
import { defaultFileHasher } from '../../hasher/file-hasher';
function respondToClient(socket: Socket, message: string) {
return new Promise((res) => {
socket.write(message, () => {
// Close the connection once all data has been written so that the client knows when to read it.
socket.end();
serverLogger.log(`Closed Connection to Client`);
res(null);
});
});
}
import { handleRequestProjectGraph } from './handle-request-project-graph';
import { handleProcessInBackground } from './handle-process-in-background';
let watcherSubscription: WatcherSubscription | undefined;
let performanceObserver: PerformanceObserver | undefined;
let watcherError: Error | undefined;
async function respondWithErrorAndExit(
socket: Socket,
description: string,
error: Error
) {
// print some extra stuff in the error message
serverLogger.requestLog(
`Responding to the client with an error.`,
description,
error.message
);
console.error(error);
error.message = `${error.message}\n\nBecause of the error the Nx daemon process has exited. The next Nx command is going to restart the daemon process.\nIf the error persists, please run "nx reset".`;
await respondToClient(socket, serializeResult(error, null));
process.exit(1);
}
const server = createServer(async (socket) => {
resetInactivityTimeout(handleInactivityTimeout);
if (!performanceObserver) {
@ -90,70 +60,33 @@ const server = createServer(async (socket) => {
resetInactivityTimeout(handleInactivityTimeout);
const payload = data.toString();
if (payload !== 'REQUEST_PROJECT_GRAPH_PAYLOAD') {
const unparsedPayload = data.toString();
let payload;
try {
payload = JSON.parse(unparsedPayload);
} catch (e) {
await respondWithErrorAndExit(
socket,
`Invalid payload from the client`,
new Error(`Unsupported payload sent to daemon server: ${payload}`)
);
}
performance.mark('server-connection');
serverLogger.requestLog('Client Request for Project Graph Received');
const result = await getCachedSerializedProjectGraphPromise();
if (result.error) {
await respondWithErrorAndExit(
socket,
`Error when preparing serialized project graph.`,
result.error
);
}
const serializedResult = serializeResult(
result.error,
result.serializedProjectGraph
);
if (!serializedResult) {
await respondWithErrorAndExit(
socket,
`Error when serializing project graph result.`,
new Error(
'Critical error when serializing server result, check server logs'
`Unsupported payload sent to daemon server: ${unparsedPayload}`
)
);
}
performance.mark('serialized-project-graph-ready');
performance.measure(
'total for creating and serializing project graph',
'server-connection',
'serialized-project-graph-ready'
);
socket.write(serializedResult, () => {
performance.mark('serialized-project-graph-written-to-client');
performance.measure(
'write project graph to socket',
'serialized-project-graph-ready',
'serialized-project-graph-written-to-client'
if (payload.type === 'REQUEST_PROJECT_GRAPH') {
await handleRequestProjectGraph(socket);
} else if (payload.type === 'PROCESS_IN_BACKGROUND') {
await handleProcessInBackground(socket, payload);
} else {
await respondWithErrorAndExit(
socket,
`Invalid payload from the client`,
new Error(
`Unsupported payload sent to daemon server: ${unparsedPayload}`
)
);
// Close the connection once all data has been written so that the client knows when to read it.
socket.end();
performance.measure(
'total for server response',
'server-connection',
'serialized-project-graph-written-to-client'
);
const bytesWritten = Buffer.byteLength(
result.serializedProjectGraph,
'utf-8'
);
serverLogger.requestLog(
`Closed Connection to Client (${bytesWritten} bytes transferred)`
);
});
}
});
});

View File

@ -1,7 +1,8 @@
import { workspaceRoot } from '../../utils/workspace-root';
import type { Server } from 'net';
import type { Server, Socket } from 'net';
import { serverLogger } from './logger';
import type { WatcherSubscription } from './watcher';
import { serializeResult } from 'nx/src/daemon/socket-utils';
export const SERVER_INACTIVITY_TIMEOUT_MS = 10800000 as const; // 10800000 ms = 3 hours
@ -38,3 +39,33 @@ export function resetInactivityTimeout(cb: () => void): void {
}
serverInactivityTimerId = setTimeout(cb, SERVER_INACTIVITY_TIMEOUT_MS);
}
function respondToClient(socket: Socket, message: string) {
return new Promise((res) => {
socket.write(message, () => {
// Close the connection once all data has been written so that the client knows when to read it.
socket.end();
serverLogger.log(`Closed Connection to Client`);
res(null);
});
});
}
export async function respondWithErrorAndExit(
socket: Socket,
description: string,
error: Error
) {
// print some extra stuff in the error message
serverLogger.requestLog(
`Responding to the client with an error.`,
description,
error.message
);
console.error(error);
error.message = `${error.message}\n\nBecause of the error the Nx daemon process has exited. The next Nx command is going to restart the daemon process.\nIf the error persists, please run "nx reset".`;
await respondToClient(socket, serializeResult(error, null));
process.exit(1);
}

View File

@ -2,14 +2,8 @@ import { ProjectGraphCache, readCache } from './nx-deps-cache';
import { buildProjectGraph } from './build-project-graph';
import { workspaceFileName } from './file-utils';
import { output } from '../utils/output';
import { isCI } from '../utils/is-ci';
import { defaultFileHasher } from '../hasher/file-hasher';
import {
isDaemonDisabled,
markDaemonAsDisabled,
writeDaemonLogs,
} from '../daemon/tmp-dir';
import { statSync } from 'fs';
import { markDaemonAsDisabled, writeDaemonLogs } from '../daemon/tmp-dir';
import { ProjectGraph, ProjectGraphV4 } from '../config/project-graph';
import { stripIndents } from '../utils/strip-indents';
import { readNxJson } from '../config/configuration';
@ -17,6 +11,7 @@ import {
ProjectConfiguration,
ProjectsConfigurations,
} from '../config/workspace-json-project-json';
import { DaemonClient } from '../daemon/client/client';
/**
* Synchronously reads the latest cached copy of the workspace's ProjectGraph.
@ -108,35 +103,12 @@ async function buildProjectGraphWithoutDaemon() {
*/
export async function createProjectGraphAsync(): Promise<ProjectGraph> {
const nxJson = readNxJson();
const useDaemonProcessOption =
nxJson.tasksRunnerOptions?.['default']?.options?.useDaemonProcess;
const env = process.env.NX_DAEMON;
// env takes precedence
// option=true,env=false => no daemon
// option=false,env=undefined => no daemon
// option=false,env=false => no daemon
// option=undefined,env=undefined => daemon
// option=true,env=true => daemon
// option=false,env=true => daemon
if (
isCI() ||
isDocker() ||
isDaemonDisabled() ||
(useDaemonProcessOption === undefined && env === 'false') ||
(useDaemonProcessOption === true && env === 'false') ||
(useDaemonProcessOption === false && env === undefined) ||
(useDaemonProcessOption === false && env === 'false')
) {
const daemon = new DaemonClient(nxJson);
if (!daemon.enabled()) {
return await buildProjectGraphWithoutDaemon();
} else {
try {
const daemonClient = require('../daemon/client/client');
if (!(await daemonClient.isServerAvailable())) {
await daemonClient.startInBackground();
}
return daemonClient.getProjectGraphFromServer();
return daemon.getProjectGraph();
} catch (e) {
if (e.message.indexOf('inotify_add_watch') > -1) {
// common errors with the daemon due to OS settings (cannot watch all the files available)
@ -164,23 +136,6 @@ export async function createProjectGraphAsync(): Promise<ProjectGraph> {
}
}
function isDocker() {
try {
statSync('/.dockerenv');
return true;
} catch {
return false;
}
}
function printErrorMessage(e: any) {
const lines = e.message.split('\n');
output.error({
title: lines[0],
bodyLines: lines.slice(1),
});
}
/**
* Backwards compatibility adapter for project graph
* @param {string} sourceVersion

View File

@ -27,6 +27,7 @@ import { handleErrors } from '../utils/params';
import { Workspaces } from 'nx/src/config/workspaces';
import { Hasher } from 'nx/src/hasher/hasher';
import { hashDependsOnOtherTasks, hashTask } from 'nx/src/hasher/hash-task';
import { DaemonClient } from '../daemon/client/client';
async function getTerminalOutputLifeCycle(
initiatingProject: string,
@ -192,6 +193,7 @@ export async function runCommand(
nxArgs,
taskGraph,
hasher,
daemon: new DaemonClient(nxJson),
}
);
let anyFailures;

View File

@ -3,6 +3,7 @@ import { ProjectGraph } from '../config/project-graph';
import { Task, TaskGraph } from '../config/task-graph';
import { NxArgs } from '../utils/command-line-utils';
import { Hasher } from '../hasher/hasher';
import { DaemonClient } from '../daemon/client/client';
export type TaskStatus =
| 'success'
@ -27,5 +28,6 @@ export type TasksRunner<T = unknown> = (
nxArgs: NxArgs;
taskGraph?: TaskGraph;
hasher?: Hasher;
daemon?: DaemonClient;
}
) => any | Promise<{ [id: string]: TaskStatus }>;

View File

@ -3738,10 +3738,10 @@
url-loader "^4.1.1"
webpack-merge "^5.8.0"
"@nrwl/nx-cloud@14.4.1":
version "14.4.1"
resolved "https://registry.yarnpkg.com/@nrwl/nx-cloud/-/nx-cloud-14.4.1.tgz#a4d8e9ce6e5bbb753916adadaf0e23ca24b54823"
integrity sha512-vlWpBmIGfYvB9XMAdDZWOihOTFPE2VV9CDeZzBbSMF32KxDqUkhfaLf3dg6puIeUPkPbj5k+V57xjAl7g9k+Xw==
"@nrwl/nx-cloud@14.4.2-beta.2":
version "14.4.2-beta.2"
resolved "https://registry.yarnpkg.com/@nrwl/nx-cloud/-/nx-cloud-14.4.2-beta.2.tgz#9e913c9fc182827492aa6960bda5066ff50c700a"
integrity sha512-sJcxDFGqAcehLaE5DeWokuAKZmSAOI+b1k2w+9QgXNYVQD7dbS+rP0qo+32W+PYMElDWfwizOszKiaCNF7dHDw==
dependencies:
axios "^0.21.1"
chalk "4.1.0"