fix(core): rework the daemon process communication to use a queue and a long running connection

This commit is contained in:
Victor Savkin 2022-09-13 13:49:12 -04:00
parent aaad9a4cf4
commit a3e480b6d7
18 changed files with 453 additions and 295 deletions

View File

@ -17,12 +17,6 @@ Install `nx` globally to invoke the command directly using `nx`, or use `npx nx`
## Options
### background
Type: boolean
Default: true
### help
Type: boolean
@ -35,6 +29,12 @@ Type: boolean
Default: false
### stop
Type: boolean
Default: false
### version
Type: boolean

View File

@ -1,8 +1,9 @@
---
title: "migrate - CLI command"
description: "Creates a migrations file or runs migrations from the migrations file.
- Migrate packages and create migrations.json (e.g., nx migrate @nrwl/workspace@latest)
- Run migrations (e.g., nx migrate --run-migrations=migrations.json)"
title: 'migrate - CLI command'
description:
'Creates a migrations file or runs migrations from the migrations file.
- Migrate packages and create migrations.json (e.g., nx migrate @nrwl/workspace@latest)
- Run migrations (e.g., nx migrate --run-migrations=migrations.json)'
---
# migrate

View File

@ -34,7 +34,7 @@
"name": "daemon",
"id": "daemon",
"file": "generated/cli/daemon",
"content": "---\ntitle: 'daemon - CLI command'\ndescription: 'Prints information about the Nx Daemon process or starts a daemon process'\n---\n\n# daemon\n\nPrints information about the Nx Daemon process or starts a daemon process\n\n## Usage\n\n```bash\nnx daemon\n```\n\nInstall `nx` globally to invoke the command directly using `nx`, or use `npx nx`, `yarn nx`, or `pnpx nx`.\n\n## Options\n\n### background\n\nType: boolean\n\nDefault: true\n\n### help\n\nType: boolean\n\nShow help\n\n### start\n\nType: boolean\n\nDefault: false\n\n### version\n\nType: boolean\n\nShow version number\n"
"content": "---\ntitle: 'daemon - CLI command'\ndescription: 'Prints information about the Nx Daemon process or starts a daemon process'\n---\n\n# daemon\n\nPrints information about the Nx Daemon process or starts a daemon process\n\n## Usage\n\n```bash\nnx daemon\n```\n\nInstall `nx` globally to invoke the command directly using `nx`, or use `npx nx`, `yarn nx`, or `pnpx nx`.\n\n## Options\n\n### help\n\nType: boolean\n\nShow help\n\n### start\n\nType: boolean\n\nDefault: false\n\n### stop\n\nType: boolean\n\nDefault: false\n\n### version\n\nType: boolean\n\nShow version number\n"
},
{
"name": "graph",
@ -94,7 +94,7 @@
"name": "migrate",
"id": "migrate",
"file": "generated/cli/migrate",
"content": "---\ntitle: \"migrate - CLI command\"\ndescription: \"Creates a migrations file or runs migrations from the migrations file.\n- Migrate packages and create migrations.json (e.g., nx migrate @nrwl/workspace@latest)\n- Run migrations (e.g., nx migrate --run-migrations=migrations.json)\"\n---\n\n# migrate\n\nCreates a migrations file or runs migrations from the migrations file.\n\n- Migrate packages and create migrations.json (e.g., nx migrate @nrwl/workspace@latest)\n- Run migrations (e.g., nx migrate --run-migrations=migrations.json)\n\n## Usage\n\n```bash\nnx migrate [packageAndVersion]\n```\n\nInstall `nx` globally to invoke the command directly using `nx`, or use `npx nx`, `yarn nx`, or `pnpx nx`.\n\n### Examples\n\nUpdate @nrwl/workspace to \"next\". This will update other packages and will generate migrations.json:\n\n```bash\nnx migrate next\n```\n\nUpdate @nrwl/workspace to \"9.0.0\". This will update other packages and will generate migrations.json:\n\n```bash\nnx migrate 9.0.0\n```\n\nUpdate @nrwl/workspace and generate the list of migrations starting with version 8.0.0 of @nrwl/workspace and @nrwl/node, regardless of what installed locally:\n\n```bash\nnx migrate @nrwl/workspace@9.0.0 --from=\"@nrwl/workspace@8.0.0,@nrwl/node@8.0.0\"\n```\n\nUpdate @nrwl/workspace to \"9.0.0\". If it tries to update @nrwl/react or @nrwl/angular, use version \"9.0.1\":\n\n```bash\nnx migrate @nrwl/workspace@9.0.0 --to=\"@nrwl/react@9.0.1,@nrwl/angular@9.0.1\"\n```\n\nUpdate another-package to \"12.0.0\". This will update other packages and will generate migrations.json file:\n\n```bash\nnx migrate another-package@12.0.0\n```\n\nRun migrations from the provided migrations.json file. You can modify migrations.json and run this command many times:\n\n```bash\nnx migrate --run-migrations=migrations.json\n```\n\nCreate a dedicated commit for each successfully completed migration. You can customize the prefix used for each commit by additionally setting --commit-prefix=\"PREFIX_HERE \":\n\n```bash\nnx migrate --run-migrations --create-commits\n```\n\n## Options\n\n### commitPrefix\n\nType: string\n\nDefault: chore: [nx migration]\n\nCommit prefix to apply to the commit for each migration, when --create-commits is enabled\n\n### createCommits\n\nType: boolean\n\nDefault: false\n\nAutomatically create a git commit after each migration runs\n\n### from\n\nType: string\n\nUse the provided versions for packages instead of the ones installed in node_modules (e.g., --from=\"@nrwl/react:12.0.0,@nrwl/js:12.0.0\")\n\n### help\n\nType: boolean\n\nShow help\n\n### packageAndVersion\n\nType: string\n\nThe target package and version (e.g, @nrwl/workspace@13.0.0)\n\n### runMigrations\n\nType: string\n\nExecute migrations from a file (when the file isn't provided, execute migrations from migrations.json)\n\n### to\n\nType: string\n\nUse the provided versions for packages instead of the ones calculated by the migrator (e.g., --to=\"@nrwl/react:12.0.0,@nrwl/js:12.0.0\")\n\n### version\n\nType: boolean\n\nShow version number\n"
"content": "---\ntitle: 'migrate - CLI command'\ndescription:\n 'Creates a migrations file or runs migrations from the migrations file.\n - Migrate packages and create migrations.json (e.g., nx migrate @nrwl/workspace@latest)\n - Run migrations (e.g., nx migrate --run-migrations=migrations.json)'\n---\n\n# migrate\n\nCreates a migrations file or runs migrations from the migrations file.\n\n- Migrate packages and create migrations.json (e.g., nx migrate @nrwl/workspace@latest)\n- Run migrations (e.g., nx migrate --run-migrations=migrations.json)\n\n## Usage\n\n```bash\nnx migrate [packageAndVersion]\n```\n\nInstall `nx` globally to invoke the command directly using `nx`, or use `npx nx`, `yarn nx`, or `pnpx nx`.\n\n### Examples\n\nUpdate @nrwl/workspace to \"next\". This will update other packages and will generate migrations.json:\n\n```bash\nnx migrate next\n```\n\nUpdate @nrwl/workspace to \"9.0.0\". This will update other packages and will generate migrations.json:\n\n```bash\nnx migrate 9.0.0\n```\n\nUpdate @nrwl/workspace and generate the list of migrations starting with version 8.0.0 of @nrwl/workspace and @nrwl/node, regardless of what installed locally:\n\n```bash\nnx migrate @nrwl/workspace@9.0.0 --from=\"@nrwl/workspace@8.0.0,@nrwl/node@8.0.0\"\n```\n\nUpdate @nrwl/workspace to \"9.0.0\". If it tries to update @nrwl/react or @nrwl/angular, use version \"9.0.1\":\n\n```bash\nnx migrate @nrwl/workspace@9.0.0 --to=\"@nrwl/react@9.0.1,@nrwl/angular@9.0.1\"\n```\n\nUpdate another-package to \"12.0.0\". This will update other packages and will generate migrations.json file:\n\n```bash\nnx migrate another-package@12.0.0\n```\n\nRun migrations from the provided migrations.json file. You can modify migrations.json and run this command many times:\n\n```bash\nnx migrate --run-migrations=migrations.json\n```\n\nCreate a dedicated commit for each successfully completed migration. You can customize the prefix used for each commit by additionally setting --commit-prefix=\"PREFIX_HERE \":\n\n```bash\nnx migrate --run-migrations --create-commits\n```\n\n## Options\n\n### commitPrefix\n\nType: string\n\nDefault: chore: [nx migration]\n\nCommit prefix to apply to the commit for each migration, when --create-commits is enabled\n\n### createCommits\n\nType: boolean\n\nDefault: false\n\nAutomatically create a git commit after each migration runs\n\n### from\n\nType: string\n\nUse the provided versions for packages instead of the ones installed in node_modules (e.g., --from=\"@nrwl/react:12.0.0,@nrwl/js:12.0.0\")\n\n### help\n\nType: boolean\n\nShow help\n\n### packageAndVersion\n\nType: string\n\nThe target package and version (e.g, @nrwl/workspace@13.0.0)\n\n### runMigrations\n\nType: string\n\nExecute migrations from a file (when the file isn't provided, execute migrations from migrations.json)\n\n### to\n\nType: string\n\nUse the provided versions for packages instead of the ones calculated by the migrator (e.g., --to=\"@nrwl/react:12.0.0,@nrwl/js:12.0.0\")\n\n### version\n\nType: boolean\n\nShow version number\n"
},
{
"name": "report",

View File

@ -2,14 +2,14 @@ import { buildProjectGraphWithoutDaemon } from '../src/project-graph/project-gra
import { workspaceRoot } from '../src/utils/workspace-root';
import { fileExists } from '../src/utils/fileutils';
import { join } from 'path';
import { isServerAvailable, stop } from '../src/daemon/client/client';
import { daemonClient } from '../src/daemon/client/client';
(async () => {
try {
if (fileExists(join(workspaceRoot, 'nx.json'))) {
if (await isServerAvailable()) {
await stop();
}
try {
await daemonClient.stop();
} catch (e) {}
const b = new Date();
await buildProjectGraphWithoutDaemon();
const a = new Date();

View File

@ -5,13 +5,8 @@ import { generateDaemonHelpOutput } from '../daemon/client/generate-help-output'
export async function daemonHandler(args: Arguments) {
if (args.start) {
const { startInBackground, startInCurrentProcess } = await import(
'../daemon/client/client'
);
if (!args.background) {
return startInCurrentProcess();
}
const pid = await startInBackground();
const { daemonClient } = await import('../daemon/client/client');
const pid = await daemonClient.startInBackground();
output.log({
title: `Daemon Server - Started in a background process...`,
bodyLines: [
@ -20,6 +15,9 @@ export async function daemonHandler(args: Arguments) {
)} ${DAEMON_OUTPUT_LOG_FILE}\n`,
],
});
} else if (args.stop) {
const { daemonClient } = await import('../daemon/client/client');
daemonClient.stop();
} else {
console.log(generateDaemonHelpOutput());
}

View File

@ -145,8 +145,10 @@ export const commandsObject = yargs
withAffectedOptions(withPlainOption(yargs)),
'affected:apps'
),
handler: async (args) =>
(await import('./affected')).affected('apps', { ...args }),
handler: async (args) => {
await (await import('./affected')).affected('apps', { ...args });
process.exit(0);
},
})
.command({
command: 'affected:libs',
@ -158,10 +160,14 @@ export const commandsObject = yargs
withAffectedOptions(withPlainOption(yargs)),
'affected:libs'
),
handler: async (args) =>
(await import('./affected')).affected('libs', {
handler: async (args) => {
await (
await import('./affected')
).affected('libs', {
...args,
}),
});
process.exit(0);
},
})
.command({
command: 'affected:graph',
@ -172,10 +178,14 @@ export const commandsObject = yargs
withAffectedOptions(withDepGraphOptions(yargs)),
'affected:graph'
),
handler: async (args) =>
(await import('./affected')).affected('graph', {
handler: async (args) => {
await (
await import('./affected')
).affected('graph', {
...args,
}),
});
process.exit(0);
},
})
.command({
command: 'print-affected',
@ -186,18 +196,19 @@ export const commandsObject = yargs
withAffectedOptions(withPrintAffectedOptions(yargs)),
'print-affected'
),
handler: async (args) =>
(await import('./affected')).affected(
'print-affected',
withOverrides(args)
),
handler: async (args) => {
await (
await import('./affected')
).affected('print-affected', withOverrides(args));
process.exit(0);
},
})
.command({
command: 'daemon',
describe:
'Prints information about the Nx Daemon process or starts a daemon process',
builder: (yargs) =>
linkToNxDevAndExamples(withDaemonStartOptions(yargs), 'daemon'),
linkToNxDevAndExamples(withDaemonOptions(yargs), 'daemon'),
handler: async (args) => (await import('./daemon')).daemonHandler(args),
})
@ -207,8 +218,10 @@ export const commandsObject = yargs
aliases: ['dep-graph'],
builder: (yargs) =>
linkToNxDevAndExamples(withDepGraphOptions(yargs), 'dep-graph'),
handler: async (args) =>
(await import('./dep-graph')).generateGraph(args as any, []),
handler: async (args) => {
await (await import('./dep-graph')).generateGraph(args as any, []);
process.exit(0);
},
})
.command({
@ -216,7 +229,10 @@ export const commandsObject = yargs
describe: 'Check for un-formatted files',
builder: (yargs) =>
linkToNxDevAndExamples(withFormatOptions(yargs), 'format:check'),
handler: async (args) => (await import('./format')).format('check', args),
handler: async (args) => {
await (await import('./format')).format('check', args);
process.exit(0);
},
})
.command({
command: 'format:write',
@ -224,12 +240,18 @@ export const commandsObject = yargs
aliases: ['format'],
builder: (yargs) =>
linkToNxDevAndExamples(withFormatOptions(yargs), 'format:write'),
handler: async (args) => (await import('./format')).format('write', args),
handler: async (args) => {
await (await import('./format')).format('write', args);
process.exit(0);
},
})
.command({
command: 'workspace-lint [files..]',
describe: 'Lint nx specific workspace files (nx.json, workspace.json)',
handler: async () => (await import('./lint')).workspaceLint(),
handler: async () => {
await (await import('./lint')).workspaceLint();
process.exit(0);
},
})
.command({
@ -241,37 +263,51 @@ export const commandsObject = yargs
await withWorkspaceGeneratorOptions(yargs),
'workspace-generator'
),
handler: async () =>
(await import('./workspace-generators')).workspaceGenerators(
process.argv.slice(3)
),
handler: async () => {
await (
await import('./workspace-generators')
).workspaceGenerators(process.argv.slice(3));
process.exit(0);
},
})
.command({
command: 'migrate [packageAndVersion]',
describe: `Creates a migrations file or runs migrations from the migrations file.
- Migrate packages and create migrations.json (e.g., nx migrate @nrwl/workspace@latest)
- Run migrations (e.g., nx migrate --run-migrations=migrations.json)`,
- Migrate packages and create migrations.json (e.g., nx migrate @nrwl/workspace@latest)
- Run migrations (e.g., nx migrate --run-migrations=migrations.json)`,
builder: (yargs) =>
linkToNxDevAndExamples(withMigrationOptions(yargs), 'migrate'),
handler: () => runMigration(),
handler: () => {
runMigration();
process.exit(0);
},
})
.command({
command: 'report',
describe:
'Reports useful version numbers to copy into the Nx issue template',
handler: async () => (await import('./report')).reportHandler(),
handler: async () => {
await (await import('./report')).reportHandler();
process.exit(0);
},
})
.command({
command: 'init',
describe: 'Adds nx.json file and installs nx if not installed already',
handler: async () => (await import('./init')).initHandler(),
handler: async () => {
await (await import('./init')).initHandler();
process.exit(0);
},
})
.command({
command: 'list [plugin]',
describe:
'Lists installed plugins, capabilities of installed plugins and other available plugins.',
builder: (yargs) => withListOptions(yargs),
handler: async (args: any) => (await import('./list')).listHandler(args),
handler: async (args: any) => {
await (await import('./list')).listHandler(args);
process.exit(0);
},
})
.command({
command: 'reset',
@ -284,8 +320,10 @@ export const commandsObject = yargs
command: 'connect-to-nx-cloud',
describe: `Makes sure the workspace is connected to Nx Cloud`,
builder: (yargs) => linkToNxDevAndExamples(yargs, 'connect-to-nx-cloud'),
handler: async () =>
(await import('./connect-to-nx-cloud')).connectToNxCloudCommand(),
handler: async () => {
await (await import('./connect-to-nx-cloud')).connectToNxCloudCommand();
process.exit(0);
},
})
.command({
command: 'new [_..]',
@ -342,12 +380,15 @@ function withFormatOptions(yargs: yargs.Argv): yargs.Argv {
});
}
function withDaemonStartOptions(yargs: yargs.Argv): yargs.Argv {
function withDaemonOptions(yargs: yargs.Argv): yargs.Argv {
return yargs
.option('background', { type: 'boolean', default: true })
.option('start', {
type: 'boolean',
default: false,
})
.option('stop', {
type: 'boolean',
default: false,
});
}

View File

@ -1,5 +1,5 @@
import { removeSync } from 'fs-extra';
import { stop as stopDaemon } from '../daemon/client/client';
import { daemonClient } from '../daemon/client/client';
import { cacheDir, projectGraphCacheDirectory } from '../utils/cache-directory';
import { output } from '../utils/output';
@ -8,7 +8,7 @@ export function resetHandler() {
title: 'Resetting the Nx workspace cache and stopping the Nx Daemon.',
bodyLines: [`This might take a few minutes.`],
});
stopDaemon();
daemonClient.stop();
removeSync(cacheDir);
if (projectGraphCacheDirectory !== cacheDir) {
removeSync(projectGraphCacheDirectory);

View File

@ -20,6 +20,8 @@ import {
import { ProjectGraph } from '../../config/project-graph';
import { isCI } from '../../utils/is-ci';
import { NxJsonConfiguration } from '../../config/nx-json';
import { readNxJson } from '../../config/configuration';
import { PromisedBasedQueue } from '../../utils/promised-based-queue';
const DAEMON_ENV_SETTINGS = {
...process.env,
@ -30,7 +32,16 @@ const DAEMON_ENV_SETTINGS = {
export class DaemonClient {
constructor(private readonly nxJson: NxJsonConfiguration) {}
private queue = new PromisedBasedQueue();
private socket = null;
private currentMessage = null;
private currentResolve = null;
private currentReject = null;
private _enabled: boolean | undefined;
private _connected: boolean = false;
enabled() {
if (this._enabled === undefined) {
@ -64,29 +75,20 @@ export class DaemonClient {
}
async getProjectGraph(): Promise<ProjectGraph> {
if (!(await isServerAvailable())) {
await startInBackground();
}
const r = await sendMessageToDaemon({ type: 'REQUEST_PROJECT_GRAPH' });
return r.projectGraph;
return (await this.sendToDaemonViaQueue({ type: 'REQUEST_PROJECT_GRAPH' }))
.projectGraph;
}
async processInBackground(requirePath: string, data: any): Promise<any> {
if (!(await isServerAvailable())) {
await startInBackground();
}
return sendMessageToDaemon({
processInBackground(requirePath: string, data: any): Promise<any> {
return this.sendToDaemonViaQueue({
type: 'PROCESS_IN_BACKGROUND',
requirePath,
data,
});
}
async recordOutputsHash(outputs: string[], hash: string): Promise<any> {
if (!(await isServerAvailable())) {
await startInBackground();
}
return sendMessageToDaemon({
recordOutputsHash(outputs: string[], hash: string): Promise<any> {
return this.sendToDaemonViaQueue({
type: 'RECORD_OUTPUTS_HASH',
data: {
outputs,
@ -95,11 +97,8 @@ export class DaemonClient {
});
}
async outputsHashesMatch(outputs: string[], hash: string): Promise<any> {
if (!(await isServerAvailable())) {
await startInBackground();
}
return sendMessageToDaemon({
outputsHashesMatch(outputs: string[], hash: string): Promise<any> {
return this.sendToDaemonViaQueue({
type: 'OUTPUTS_HASHES_MATCH',
data: {
outputs,
@ -107,8 +106,214 @@ export class DaemonClient {
},
});
}
async isServerAvailable(): Promise<boolean> {
return new Promise((resolve) => {
try {
const socket = connect(FULL_OS_SOCKET_PATH, () => {
socket.destroy();
resolve(true);
});
socket.once('error', () => {
resolve(false);
});
} catch (err) {
resolve(false);
}
});
}
private async sendToDaemonViaQueue(messageToDaemon: any): Promise<any> {
return this.queue.sendToQueue(() =>
this.sendMessageToDaemon(messageToDaemon)
);
}
private setUpConnection() {
this.socket = connect(FULL_OS_SOCKET_PATH);
this.socket.on('ready', () => {
let message = '';
this.socket.on('data', (data) => {
const chunk = data.toString();
if (chunk.length === 0 || chunk.codePointAt(chunk.length - 1) != 4) {
message += chunk;
} else {
message += chunk.substring(0, chunk.length - 1);
this.handleMessage(message);
message = '';
this.currentMessage = null;
this.currentResolve = null;
this.currentReject = null;
}
});
});
this.socket.on('close', () => {
output.error({
title: 'Daemon process terminated and closed the connection',
bodyLines: ['Please rerun the command, which will restart the daemon.'],
});
process.exit(1);
});
this.socket.on('error', (err) => {
if (!err.message) {
return this.currentReject(daemonProcessException(err.toString()));
}
if (err.message.startsWith('LOCK-FILES-CHANGED')) {
// retry the current message
// we cannot send it via the queue because we are in the middle of processing
// a message from the queue
return this.sendMessageToDaemon(this.currentMessage).then(
this.currentResolve,
this.currentReject
);
}
let error: any;
if (err.message.startsWith('connect ENOENT')) {
error = daemonProcessException('The Daemon Server is not running');
} else if (err.message.startsWith('connect ECONNREFUSED')) {
error = daemonProcessException(
`A server instance had not been fully shut down. Please try running the command again.`
);
killSocketOrPath();
} else if (err.message.startsWith('read ECONNRESET')) {
error = daemonProcessException(
`Unable to connect to the daemon process.`
);
} else {
error = daemonProcessException(err.toString());
}
return this.currentReject(error);
});
}
private async sendMessageToDaemon(message: any): Promise<any> {
if (!this._connected) {
this._connected = true;
if (!(await this.isServerAvailable())) {
await this.startInBackground();
}
this.setUpConnection();
}
return new Promise((resolve, reject) => {
performance.mark('sendMessageToDaemon-start');
this.currentMessage = message;
this.currentResolve = resolve;
this.currentReject = reject;
this.socket.write(JSON.stringify(message));
// send EOT to indicate that the message has been fully written
this.socket.write(String.fromCodePoint(4));
});
}
private handleMessage(serializedResult: string) {
try {
performance.mark('json-parse-start');
const parsedResult = JSON.parse(serializedResult);
performance.mark('json-parse-end');
performance.measure(
'deserialize daemon response',
'json-parse-start',
'json-parse-end'
);
if (parsedResult.error) {
this.currentReject(parsedResult.error);
} else {
performance.measure(
'total for sendMessageToDaemon()',
'sendMessageToDaemon-start',
'json-parse-end'
);
return this.currentResolve(parsedResult);
}
} catch (e) {
const endOfResponse =
serializedResult.length > 300
? serializedResult.substring(serializedResult.length - 300)
: serializedResult;
this.currentReject(
daemonProcessException(
[
'Could not deserialize response from Nx daemon.',
`Message: ${e.message}`,
'\n',
`Received:`,
endOfResponse,
'\n',
].join('\n')
)
);
}
}
async startInBackground(): Promise<ChildProcess['pid']> {
await safelyCleanUpExistingProcess();
ensureDirSync(DAEMON_DIR_FOR_CURRENT_WORKSPACE);
ensureFileSync(DAEMON_OUTPUT_LOG_FILE);
const out = openSync(DAEMON_OUTPUT_LOG_FILE, 'a');
const err = openSync(DAEMON_OUTPUT_LOG_FILE, 'a');
const backgroundProcess = spawn(
process.execPath,
[join(__dirname, '../server/start.js')],
{
cwd: workspaceRoot,
stdio: ['ignore', out, err],
detached: true,
windowsHide: true,
shell: false,
env: DAEMON_ENV_SETTINGS,
}
);
backgroundProcess.unref();
// Persist metadata about the background process so that it can be cleaned up later if needed
await writeDaemonJsonProcessCache({
processId: backgroundProcess.pid,
});
/**
* Ensure the server is actually available to connect to via IPC before resolving
*/
let attempts = 0;
return new Promise((resolve, reject) => {
const id = setInterval(async () => {
if (await this.isServerAvailable()) {
clearInterval(id);
resolve(backgroundProcess.pid);
} else if (attempts > 200) {
// daemon fails to start, the process probably exited
// we print the logs and exit the client
reject(
daemonProcessException('Failed to start the Nx Daemon process.')
);
} else {
attempts++;
}
}, 10);
});
}
stop(): void {
spawnSync(process.execPath, ['../server/stop.js'], {
cwd: __dirname,
stdio: 'inherit',
});
removeSocketDir();
output.log({ title: 'Daemon Server - Stopped' });
}
}
export const daemonClient = new DaemonClient(readNxJson());
function isDocker() {
try {
statSync('/.dockerenv');
@ -118,54 +323,6 @@ function isDocker() {
}
}
export async function startInBackground(): Promise<ChildProcess['pid']> {
await safelyCleanUpExistingProcess();
ensureDirSync(DAEMON_DIR_FOR_CURRENT_WORKSPACE);
ensureFileSync(DAEMON_OUTPUT_LOG_FILE);
const out = openSync(DAEMON_OUTPUT_LOG_FILE, 'a');
const err = openSync(DAEMON_OUTPUT_LOG_FILE, 'a');
const backgroundProcess = spawn(
process.execPath,
[join(__dirname, '../server/start.js')],
{
cwd: workspaceRoot,
stdio: ['ignore', out, err],
detached: true,
windowsHide: true,
shell: false,
env: DAEMON_ENV_SETTINGS,
}
);
backgroundProcess.unref();
// Persist metadata about the background process so that it can be cleaned up later if needed
await writeDaemonJsonProcessCache({
processId: backgroundProcess.pid,
});
/**
* Ensure the server is actually available to connect to via IPC before resolving
*/
let attempts = 0;
return new Promise((resolve, reject) => {
const id = setInterval(async () => {
if (await isServerAvailable()) {
clearInterval(id);
resolve(backgroundProcess.pid);
} else if (attempts > 200) {
// daemon fails to start, the process probably exited
// we print the logs and exit the client
reject(
daemonProcessException('Failed to start the Nx Daemon process.')
);
} else {
attempts++;
}
}, 10);
});
}
function daemonProcessException(message: string) {
try {
let log = readFileSync(DAEMON_OUTPUT_LOG_FILE).toString().split('\n');
@ -188,139 +345,3 @@ function daemonProcessException(message: string) {
return new Error(message);
}
}
export function startInCurrentProcess(): void {
output.log({
title: `Daemon Server - Starting in the current process...`,
});
spawnSync(process.execPath, [join(__dirname, '../server/start.js')], {
cwd: workspaceRoot,
stdio: 'inherit',
env: DAEMON_ENV_SETTINGS,
});
}
export function stop(): void {
spawnSync(process.execPath, ['../server/stop.js'], {
cwd: __dirname,
stdio: 'inherit',
});
removeSocketDir();
output.log({ title: 'Daemon Server - Stopped' });
}
/**
* As noted in the comments above the createServer() call, in order to reliably (meaning it works
* cross-platform) check whether the server is available to request a project graph from we
* need to actually attempt connecting to it.
*
* Because of the behavior of named pipes on Windows, we cannot simply treat them as a file and
* check for their existence on disk (unlike with Unix Sockets).
*/
export async function isServerAvailable(): Promise<boolean> {
return new Promise((resolve) => {
try {
const socket = connect(FULL_OS_SOCKET_PATH, () => {
socket.destroy();
resolve(true);
});
socket.once('error', () => {
resolve(false);
});
} catch (err) {
resolve(false);
}
});
}
async function sendMessageToDaemon(message: {
type: string;
requirePath?: string;
data?: any;
}): Promise<any> {
return new Promise((resolve, reject) => {
performance.mark('sendMessageToDaemon-start');
const socket = connect(FULL_OS_SOCKET_PATH);
socket.on('error', (err) => {
if (!err.message) {
return reject(daemonProcessException(err.toString()));
}
if (err.message.startsWith('LOCK-FILES-CHANGED')) {
return sendMessageToDaemon(message).then(resolve, reject);
}
let error: any;
if (err.message.startsWith('connect ENOENT')) {
error = daemonProcessException('The Daemon Server is not running');
} else if (err.message.startsWith('connect ECONNREFUSED')) {
error = daemonProcessException(
`A server instance had not been fully shut down. Please try running the command again.`
);
killSocketOrPath();
} else if (err.message.startsWith('read ECONNRESET')) {
error = daemonProcessException(
`Unable to connect to the daemon process.`
);
} else {
error = daemonProcessException(err.toString());
}
return reject(error);
});
socket.on('ready', () => {
socket.write(JSON.stringify(message));
// send EOT to indicate that the message has been fully written
socket.write(String.fromCodePoint(4));
let serializedResult = '';
socket.on('data', (data) => {
serializedResult += data.toString();
});
socket.on('end', () => {
try {
performance.mark('json-parse-start');
const parsedResult = JSON.parse(serializedResult);
performance.mark('json-parse-end');
performance.measure(
'deserialize daemon response',
'json-parse-start',
'json-parse-end'
);
if (parsedResult.error) {
reject(parsedResult.error);
} else {
performance.measure(
'total for sendMessageToDaemon()',
'sendMessageToDaemon-start',
'json-parse-end'
);
return resolve(parsedResult);
}
} catch (e) {
const endOfResponse =
serializedResult.length > 300
? serializedResult.substring(serializedResult.length - 300)
: serializedResult;
reject(
daemonProcessException(
[
'Could not deserialize response from Nx daemon.',
`Message: ${e.message}`,
'\n',
`Received:`,
endOfResponse,
'\n',
].join('\n')
)
);
}
});
});
});
}

View File

@ -1,8 +1,8 @@
import { isServerAvailable } from './client';
import { daemonClient } from './client';
(async () => {
try {
console.log(await isServerAvailable());
console.log(await daemonClient.isServerAvailable());
} catch {
console.log(false);
}

View File

@ -11,7 +11,7 @@ export function generateDaemonHelpOutput(): string {
cwd: __dirname,
});
const isServerAvailable = res?.stdout?.toString().trim() === 'true';
const isServerAvailable = res?.stdout?.toString().trim().indexOf('true') > -1;
if (!isServerAvailable) {
return '';
}

View File

@ -40,11 +40,18 @@ export async function recordOutputsHash(_outputs: string[], hash: string) {
export async function outputsHashesMatch(_outputs: string[], hash: string) {
const outputs = await normalizeOutputs(_outputs);
if (outputs.length !== numberOfExpandedOutputs[hash]) return false;
for (const output of outputs) {
if (recordedHashes[output] !== hash) return false;
let invalidated = [];
if (outputs.length !== numberOfExpandedOutputs[hash]) {
invalidated = outputs;
} else {
for (const output of outputs) {
if (recordedHashes[output] !== hash) {
invalidated.push(output);
}
}
}
return true;
await removeSubscriptionsForOutputs(invalidated);
return invalidated.length === 0;
}
function anyErrorsAssociatedWithOutputs(outputs: string[]) {

View File

@ -43,6 +43,7 @@ export type HandlerResult = {
};
const server = createServer(async (socket) => {
serverLogger.log('Established a connection');
resetInactivityTimeout(handleInactivityTimeout);
if (!performanceObserver) {
performanceObserver = new PerformanceObserver((list) => {
@ -60,8 +61,18 @@ const server = createServer(async (socket) => {
} else {
message += chunk.substring(0, chunk.length - 1);
await handleMessage(socket, message);
message = '';
}
});
socket.on('error', (e) => {
serverLogger.log('Socket error');
console.error(e);
});
socket.on('close', () => {
serverLogger.log('Closed a connection');
});
});
async function handleMessage(socket, data) {
@ -94,7 +105,12 @@ async function handleMessage(socket, data) {
);
}
if (payload.type === 'REQUEST_PROJECT_GRAPH') {
if (payload.type === 'PING') {
await handleResult(socket, {
response: JSON.stringify(true),
description: 'ping',
});
} else if (payload.type === 'REQUEST_PROJECT_GRAPH') {
await handleResult(socket, await handleRequestProjectGraph());
} else if (payload.type === 'PROCESS_IN_BACKGROUND') {
await handleResult(socket, await handleProcessInBackground(payload));

View File

@ -46,16 +46,14 @@ export function respondToClient(
description: string
) {
return new Promise(async (res) => {
socket.write(response, (err) => {
if (description) {
serverLogger.requestLog(`Responding to the client.`, description);
}
if (description) {
serverLogger.requestLog(`Responding to the client.`, description);
}
socket.write(`${response}${String.fromCodePoint(4)}`, (err) => {
if (err) {
console.error(err);
}
// Close the connection once all data has been written so that the client knows when to read it.
socket.end();
serverLogger.log(`Closed Connection to Client`, description);
serverLogger.log(`Done responding to the client`, description);
res(null);
});
});

View File

@ -11,7 +11,7 @@ import {
ProjectConfiguration,
ProjectsConfigurations,
} from '../config/workspace-json-project-json';
import { DaemonClient } from '../daemon/client/client';
import { daemonClient } from '../daemon/client/client';
/**
* Synchronously reads the latest cached copy of the workspace's ProjectGraph.
@ -120,9 +120,7 @@ function handleProjectGraphError(opts: { exitOnError: boolean }, e) {
export async function createProjectGraphAsync(
opts: { exitOnError: boolean } = { exitOnError: false }
): Promise<ProjectGraph> {
const nxJson = readNxJson();
const daemon = new DaemonClient(nxJson);
if (!daemon.enabled()) {
if (!daemonClient.enabled()) {
try {
return await buildProjectGraphWithoutDaemon();
} catch (e) {
@ -130,7 +128,7 @@ export async function createProjectGraphAsync(
}
} else {
try {
return await daemon.getProjectGraph();
return await daemonClient.getProjectGraph();
} catch (e) {
if (!e.internalDaemonError) {
handleProjectGraphError(opts, e);

View File

@ -27,7 +27,7 @@ import { handleErrors } from '../utils/params';
import { Workspaces } from 'nx/src/config/workspaces';
import { Hasher } from 'nx/src/hasher/hasher';
import { hashDependsOnOtherTasks, hashTask } from 'nx/src/hasher/hash-task';
import { DaemonClient } from '../daemon/client/client';
import { daemonClient } from '../daemon/client/client';
async function getTerminalOutputLifeCycle(
initiatingProject: string,
@ -197,7 +197,7 @@ export async function runCommand(
nxArgs,
taskGraph,
hasher,
daemon: new DaemonClient(nxJson),
daemon: daemonClient,
}
);
let anyFailures;

View File

@ -1,3 +1,15 @@
let jsonFileOverrides: Record<string, any> = {};
jest.mock('nx/src/utils/fileutils', () => ({
...(jest.requireActual('nx/src/utils/fileutils') as any),
readJsonFile: (path) => {
if (path.endsWith('nx.json')) return {};
if (!(path in jsonFileOverrides))
throw new Error('Tried to read non-mocked json file: ' + path);
return jsonFileOverrides[path];
},
}));
import { PackageJson } from './package-json';
import { ProjectGraph } from '../config/project-graph';
import {
@ -6,17 +18,6 @@ import {
mergeNpmScriptsWithTargets,
} from './project-graph-utils';
jest.mock('nx/src/utils/fileutils', () => ({
...(jest.requireActual('nx/src/utils/fileutils') as any),
readJsonFile: (path) => {
if (!(path in jsonFileOverrides))
throw new Error('Tried to read non-mocked json file: ' + path);
return jsonFileOverrides[path];
},
}));
let jsonFileOverrides: Record<string, any> = {};
describe('project graph utils', () => {
describe('getSourceDirOfDependentProjects', () => {
const projGraph: ProjectGraph = {

View File

@ -0,0 +1,49 @@
import { PromisedBasedQueue } from './promised-based-queue';
describe('PromisedBasedQueue', () => {
it('should executing functions in order', async () => {
const queue = new PromisedBasedQueue();
const log = [];
const res = [];
res.push(
await queue.sendToQueue(async () => {
log.push('1');
await wait(100);
log.push('2');
return 100;
})
);
res.push(
await queue.sendToQueue(async () => {
log.push('3');
return 200;
})
);
expect(log).toEqual(['1', '2', '3']);
expect(res).toEqual([100, 200]);
});
it('should handle errors', async () => {
const queue = new PromisedBasedQueue();
try {
await queue.sendToQueue(async () => {
throw new Error('1');
});
expect('fail').toBeTruthy();
} catch (e) {
expect(e.message).toEqual('1');
}
expect(
await queue.sendToQueue(async () => {
return 100;
})
).toEqual(100);
});
});
function wait(millis: number) {
return new Promise((res) => {
setTimeout(() => res(null), millis);
});
}

View File

@ -0,0 +1,28 @@
export class PromisedBasedQueue {
private promise = Promise.resolve(null);
sendToQueue(fn: () => Promise<any>): Promise<any> {
let res, rej;
const r = new Promise((_res, _rej) => {
res = _res;
rej = _rej;
});
this.promise = this.promise
.then(async () => {
try {
res(await fn());
} catch (e) {
rej(e);
}
})
.catch(async () => {
try {
res(await fn());
} catch (e) {
rej(e);
}
});
return r;
}
}