fix(core): handle concurrent db connections better (#28544)

This commit is contained in:
Jonathan Cammisuli 2024-10-23 15:33:33 -04:00 committed by GitHub
parent 018543c785
commit d3df76f2f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 366 additions and 132 deletions

50
Cargo.lock generated
View File

@ -290,6 +290,15 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "ci_info"
version = "0.14.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "840dbb7bdd1f2c4d434d6b08420ef204e0bfad0ab31a07a80a1248d24cc6e38b"
dependencies = [
"envmnt",
]
[[package]] [[package]]
name = "clang-sys" name = "clang-sys"
version = "1.8.1" version = "1.8.1"
@ -460,6 +469,16 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "envmnt"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d73999a2b8871e74c8b8bc23759ee9f3d85011b24fafc91a4b3b5c8cc8185501"
dependencies = [
"fsio",
"indexmap",
]
[[package]] [[package]]
name = "errno" name = "errno"
version = "0.3.8" version = "0.3.8"
@ -545,6 +564,16 @@ dependencies = [
"syn 2.0.53", "syn 2.0.53",
] ]
[[package]]
name = "fs4"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec6fcfb3c0c1d71612528825042261419d5dade9678c39a781e05b63677d9b32"
dependencies = [
"rustix",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "fs_extra" name = "fs_extra"
version = "1.3.0" version = "1.3.0"
@ -560,6 +589,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "fsio"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dad0ce30be0cc441b325c5d705c8b613a0ca0d92b6a8953d41bd236dc09a36d0"
dependencies = [
"dunce",
]
[[package]] [[package]]
name = "funty" name = "funty"
version = "2.0.0" version = "2.0.0"
@ -1030,6 +1068,16 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
]
[[package]] [[package]]
name = "inotify" name = "inotify"
version = "0.9.6" version = "0.9.6"
@ -1467,11 +1515,13 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"assert_fs", "assert_fs",
"ci_info",
"colored", "colored",
"crossbeam-channel", "crossbeam-channel",
"crossterm", "crossterm",
"dashmap", "dashmap",
"dunce", "dunce",
"fs4",
"fs_extra", "fs_extra",
"globset", "globset",
"hashbrown 0.14.5", "hashbrown 0.14.5",

View File

@ -13,6 +13,7 @@ strip = "none"
[dependencies] [dependencies]
anyhow = "1.0.71" anyhow = "1.0.71"
ci_info = "0.14.14"
colored = "2" colored = "2"
crossbeam-channel = '0.5' crossbeam-channel = '0.5'
dashmap = { version = "5.5.3", features = ["rayon"] } dashmap = { version = "5.5.3", features = ["rayon"] }
@ -55,6 +56,7 @@ mio = "0.8"
portable-pty = { git = "https://github.com/cammisuli/wezterm", rev = "b538ee29e1e89eeb4832fb35ae095564dce34c29" } portable-pty = { git = "https://github.com/cammisuli/wezterm", rev = "b538ee29e1e89eeb4832fb35ae095564dce34c29" }
crossterm = "0.27.0" crossterm = "0.27.0"
ignore-files = "2.1.0" ignore-files = "2.1.0"
fs4 = "0.10.0"
rusqlite = { version = "0.32.1", features = ["bundled", "array", "vtab"] } rusqlite = { version = "0.32.1", features = ["bundled", "array", "vtab"] }
watchexec = "3.0.1" watchexec = "3.0.1"
watchexec-events = "2.0.1" watchexec-events = "2.0.1"

View File

@ -5,11 +5,12 @@ use std::time::Instant;
use fs_extra::remove_items; use fs_extra::remove_items;
use napi::bindgen_prelude::*; use napi::bindgen_prelude::*;
use regex::Regex; use regex::Regex;
use rusqlite::{params, Connection, OptionalExtension}; use rusqlite::params;
use tracing::trace; use tracing::trace;
use crate::native::cache::expand_outputs::_expand_outputs; use crate::native::cache::expand_outputs::_expand_outputs;
use crate::native::cache::file_ops::_copy; use crate::native::cache::file_ops::_copy;
use crate::native::db::connection::NxDbConnection;
use crate::native::utils::Normalize; use crate::native::utils::Normalize;
#[napi(object)] #[napi(object)]
@ -25,7 +26,7 @@ pub struct NxCache {
pub cache_directory: String, pub cache_directory: String,
workspace_root: PathBuf, workspace_root: PathBuf,
cache_path: PathBuf, cache_path: PathBuf,
db: External<Connection>, db: External<NxDbConnection>,
link_task_details: bool, link_task_details: bool,
} }
@ -35,7 +36,7 @@ impl NxCache {
pub fn new( pub fn new(
workspace_root: String, workspace_root: String,
cache_path: String, cache_path: String,
db_connection: External<Connection>, db_connection: External<NxDbConnection>,
link_task_details: Option<bool>, link_task_details: Option<bool>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let cache_path = PathBuf::from(&cache_path); let cache_path = PathBuf::from(&cache_path);
@ -58,29 +59,26 @@ impl NxCache {
fn setup(&self) -> anyhow::Result<()> { fn setup(&self) -> anyhow::Result<()> {
let query = if self.link_task_details { let query = if self.link_task_details {
"BEGIN; "CREATE TABLE IF NOT EXISTS cache_outputs (
CREATE TABLE IF NOT EXISTS cache_outputs (
hash TEXT PRIMARY KEY NOT NULL, hash TEXT PRIMARY KEY NOT NULL,
code INTEGER NOT NULL, code INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (hash) REFERENCES task_details (hash) FOREIGN KEY (hash) REFERENCES task_details (hash)
); );
COMMIT;
" "
} else { } else {
"BEGIN; "CREATE TABLE IF NOT EXISTS cache_outputs (
CREATE TABLE IF NOT EXISTS cache_outputs (
hash TEXT PRIMARY KEY NOT NULL, hash TEXT PRIMARY KEY NOT NULL,
code INTEGER NOT NULL, code INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
); );
COMMIT;
" "
}; };
self.db.execute_batch(query).map_err(anyhow::Error::from) self.db.execute(query, []).map_err(anyhow::Error::from)?;
Ok(())
} }
#[napi] #[napi]
@ -114,8 +112,7 @@ impl NxCache {
}) })
}, },
) )
.optional() .map_err(|e| anyhow::anyhow!("Unable to get {}: {:?}", &hash, e))?;
.map_err(anyhow::Error::new)?;
trace!("GET {} {:?}", &hash, start.elapsed()); trace!("GET {} {:?}", &hash, start.elapsed());
Ok(r) Ok(r)
} }
@ -157,7 +154,6 @@ impl NxCache {
} }
} }
trace!("Recording to cache: {:?}", &hash);
self.record_to_cache(hash, code)?; self.record_to_cache(hash, code)?;
Ok(()) Ok(())
} }
@ -168,6 +164,11 @@ impl NxCache {
hash: String, hash: String,
result: CachedResult, result: CachedResult,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
trace!(
"applying remote cache results: {:?} ({})",
&hash,
&result.outputs_path
);
let terminal_output = result.terminal_output; let terminal_output = result.terminal_output;
write(self.get_task_outputs_path(hash.clone()), terminal_output)?; write(self.get_task_outputs_path(hash.clone()), terminal_output)?;
@ -187,6 +188,7 @@ impl NxCache {
} }
fn record_to_cache(&self, hash: String, code: i16) -> anyhow::Result<()> { fn record_to_cache(&self, hash: String, code: i16) -> anyhow::Result<()> {
trace!("Recording to cache: {}, {}", &hash, code);
self.db.execute( self.db.execute(
"INSERT OR REPLACE INTO cache_outputs (hash, code) VALUES (?1, ?2)", "INSERT OR REPLACE INTO cache_outputs (hash, code) VALUES (?1, ?2)",
params![hash, code], params![hash, code],
@ -235,7 +237,7 @@ impl NxCache {
Ok(vec![ Ok(vec![
self.cache_path.join(&hash), self.cache_path.join(&hash),
self.get_task_outputs_path_internal(&hash).into(), self.get_task_outputs_path_internal(&hash),
]) ])
})? })?
.filter_map(anyhow::Result::ok) .filter_map(anyhow::Result::ok)
@ -252,14 +254,15 @@ impl NxCache {
// Checks that the number of cache records in the database // Checks that the number of cache records in the database
// matches the number of cache directories on the filesystem. // matches the number of cache directories on the filesystem.
// If they don't match, it means that the cache is out of sync. // If they don't match, it means that the cache is out of sync.
let cache_records_exist = let cache_records_exist = self
self.db .db
.query_row("SELECT EXISTS (SELECT 1 FROM cache_outputs)", [], |row| { .query_row("SELECT EXISTS (SELECT 1 FROM cache_outputs)", [], |row| {
let exists: bool = row.get(0)?; let exists: bool = row.get(0)?;
Ok(exists) Ok(exists)
})?; })?
.unwrap_or(false);
if !cache_records_exist { if cache_records_exist {
let hash_regex = Regex::new(r"^\d+$").expect("Hash regex is invalid"); let hash_regex = Regex::new(r"^\d+$").expect("Hash regex is invalid");
let fs_entries = std::fs::read_dir(&self.cache_path).map_err(anyhow::Error::from)?; let fs_entries = std::fs::read_dir(&self.cache_path).map_err(anyhow::Error::from)?;

View File

@ -0,0 +1,75 @@
use anyhow::Result;
use rusqlite::{Connection, Error, OptionalExtension, Params, Row, Statement};
use std::thread;
use std::time::{Duration, Instant};
use tracing::trace;
pub struct NxDbConnection {
pub conn: Connection,
}
impl NxDbConnection {
pub fn new(connection: Connection) -> Self {
Self { conn: connection }
}
pub fn execute<P: Params + Clone>(&self, sql: &str, params: P) -> Result<usize> {
self.retry_on_busy(|conn| conn.execute(sql, params.clone()))
.map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e))
}
pub fn execute_batch(&self, sql: &str) -> Result<()> {
self.retry_on_busy(|conn| conn.execute_batch(sql))
.map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e))
}
pub fn prepare(&self, sql: &str) -> Result<Statement> {
self.retry_on_busy(|conn| conn.prepare(sql))
.map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e))
}
pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<Option<T>>
where
P: Params + Clone,
F: FnOnce(&Row<'_>) -> rusqlite::Result<T> + Clone,
{
self.retry_on_busy(|conn| conn.query_row(sql, params.clone(), f.clone()).optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
}
pub fn close(self) -> rusqlite::Result<(), (Connection, Error)> {
self.conn
.close()
.inspect_err(|e| trace!("Error in close: {:?}", e))
}
#[allow(clippy::needless_lifetimes)]
fn retry_on_busy<'a, F, T>(&'a self, operation: F) -> rusqlite::Result<T>
where
F: Fn(&'a Connection) -> rusqlite::Result<T>,
{
let start = Instant::now();
let max_retries: u64 = 5;
let retry_delay = Duration::from_millis(25);
for i in 0..max_retries {
match operation(&self.conn) {
Ok(result) => return Ok(result),
Err(Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy =>
{
trace!("Database busy. Retrying{}", ".".repeat(i as usize));
if start.elapsed()
>= Duration::from_millis(max_retries * retry_delay.as_millis() as u64)
{
break;
}
thread::sleep(retry_delay);
}
err @ Err(_) => return err,
}
}
operation(&self.conn)
}
}

View File

@ -0,0 +1,144 @@
use std::fs::{remove_file, File};
use std::path::{Path, PathBuf};
use tracing::{debug, trace};
use rusqlite::{Connection, OpenFlags};
use fs4::fs_std::FileExt;
use crate::native::db::connection::NxDbConnection;
pub(super) struct LockFile {
file: File,
path: PathBuf,
}
pub(super) fn unlock_file(lock_file: &LockFile) {
if lock_file.path.exists() {
lock_file
.file
.unlock()
.and_then(|_| remove_file(&lock_file.path))
.ok();
}
}
pub(super) fn create_lock_file(db_path: &Path) -> anyhow::Result<LockFile> {
let lock_file_path = db_path.with_extension("lock");
let lock_file = File::create(&lock_file_path)
.map_err(|e| anyhow::anyhow!("Unable to create db lock file: {:?}", e))?;
trace!("Getting lock on db lock file");
lock_file
.lock_exclusive()
.inspect(|_| trace!("Got lock on db lock file"))
.map_err(|e| anyhow::anyhow!("Unable to lock the db lock file: {:?}", e))?;
Ok(LockFile {
file: lock_file,
path: lock_file_path,
})
}
pub(super) fn initialize_db(nx_version: String, db_path: &PathBuf) -> anyhow::Result<NxDbConnection> {
let c = create_connection(db_path)?;
trace!(
"Checking if current existing database is compatible with Nx {}",
nx_version
);
let db_version = c.query_row(
"SELECT value FROM metadata WHERE key='NX_VERSION'",
[],
|row| {
let r: String = row.get(0)?;
Ok(r)
},
);
let c = match db_version {
Ok(Some(version)) if version == nx_version => {
trace!("Database is compatible with Nx {}", nx_version);
c
}
// If there is no version, it means that this database is new
Ok(None) => {
trace!("Recording Nx Version: {}", nx_version);
c.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('NX_VERSION', ?)",
[nx_version],
)?;
c
}
_ => {
trace!("Disconnecting from existing incompatible database");
c.close().map_err(|(_, error)| anyhow::Error::from(error))?;
trace!("Removing existing incompatible database");
remove_file(db_path)?;
trace!("Creating a new connection to a new database");
create_connection(db_path)?
}
};
Ok(c)
}
fn create_connection(db_path: &PathBuf) -> anyhow::Result<NxDbConnection> {
match open_database_connection(db_path) {
Ok(connection) => {
configure_database(&connection)?;
create_metadata_table(&connection)?;
Ok(connection)
}
err @ Err(_) => err,
}
}
fn create_metadata_table(c: &NxDbConnection) -> anyhow::Result<()> {
debug!("Creating table for metadata if it does not exist");
c.execute(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL
)",
[],
)?;
Ok(())
}
fn open_database_connection(db_path: &PathBuf) -> anyhow::Result<NxDbConnection> {
let conn = if cfg!(target_family = "unix") && ci_info::is_ci() {
trace!("Opening connection with unix-dotfile");
Connection::open_with_flags_and_vfs(
db_path,
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
"unix-dotfile",
)
} else {
Connection::open_with_flags(
db_path,
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_FULL_MUTEX,
)
};
conn.map_err(|e| anyhow::anyhow!("Error creating connection {:?}", e))
.map(NxDbConnection::new)
}
fn configure_database(connection: &NxDbConnection) -> anyhow::Result<()> {
connection
.conn
.pragma_update(None, "journal_mode", "WAL")
.map_err(|e| anyhow::anyhow!("Unable to set journal_mode: {:?}", e))?;
connection
.conn
.pragma_update(None, "synchronous", "NORMAL")
.map_err(|e| anyhow::anyhow!("Unable to set synchronous: {:?}", e))?;
connection
.conn
.busy_handler(Some(|tries| tries < 6))
.map_err(|e| anyhow::anyhow!("Unable to set busy handler: {:?}", e))?;
Ok(())
}

View File

@ -1,88 +1,32 @@
use rusqlite::OpenFlags; pub mod connection;
use std::fs::{create_dir_all, remove_file}; mod initialize;
use std::path::PathBuf;
use std::time::Duration;
use crate::native::db::connection::NxDbConnection;
use crate::native::machine_id::get_machine_id; use crate::native::machine_id::get_machine_id;
use napi::bindgen_prelude::External; use napi::bindgen_prelude::External;
use rusqlite::Connection; use std::fs::create_dir_all;
use tracing::debug; use std::path::PathBuf;
use std::process;
use tracing::{trace, trace_span};
#[napi] #[napi]
pub fn connect_to_nx_db( pub fn connect_to_nx_db(
cache_dir: String, cache_dir: String,
nx_version: String, nx_version: String,
db_name: Option<String>, db_name: Option<String>,
) -> anyhow::Result<External<Connection>> { ) -> anyhow::Result<External<NxDbConnection>> {
let cache_dir_buf = PathBuf::from(cache_dir); let cache_dir_buf = PathBuf::from(cache_dir);
let db_path = cache_dir_buf.join(format!("{}.db", db_name.unwrap_or_else(get_machine_id))); let db_path = cache_dir_buf.join(format!("{}.db", db_name.unwrap_or_else(get_machine_id)));
create_dir_all(cache_dir_buf)?; create_dir_all(cache_dir_buf)?;
let c = create_connection(&db_path)?; let _ = trace_span!("process", id = process::id()).entered();
trace!("Creating connection to {:?}", db_path);
let lock_file = initialize::create_lock_file(&db_path)?;
debug!( let c = initialize::initialize_db(nx_version, &db_path)
"Checking if current existing database is compatible with Nx {}", .inspect_err(|_| initialize::unlock_file(&lock_file))?;
nx_version
);
let db_version = c.query_row(
"SELECT value FROM metadata WHERE key='NX_VERSION'",
[],
|row| {
let r: String = row.get(0)?;
Ok(r)
},
);
let c = match db_version { initialize::unlock_file(&lock_file);
Ok(version) if version == nx_version => c,
_ => {
debug!("Disconnecting from existing incompatible database");
c.close().map_err(|(_, error)| anyhow::Error::from(error))?;
debug!("Removing existing incompatible database");
remove_file(&db_path)?;
debug!("Creating a new connection to a new database");
create_connection(&db_path)?
}
};
debug!("Creating table for metadata");
c.execute(
"CREATE TABLE IF NOT EXISTS metadata (
key TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL
)",
[],
)?;
debug!("Recording Nx Version: {}", nx_version);
c.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('NX_VERSION', ?)",
[nx_version],
)?;
Ok(External::new(c)) Ok(External::new(c))
} }
fn create_connection(db_path: &PathBuf) -> anyhow::Result<Connection> {
debug!("Creating connection to {:?}", db_path);
let c = Connection::open_with_flags(
db_path,
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_FULL_MUTEX,
)
.map_err(anyhow::Error::from)?;
// This allows writes at the same time as reads
c.pragma_update(None, "journal_mode", "WAL")?;
// This makes things less synchronous than default
c.pragma_update(None, "synchronous", "NORMAL")?;
c.busy_timeout(Duration::from_millis(25))?;
c.busy_handler(Some(|tries| tries < 6))?;
Ok(c)
}

View File

@ -28,7 +28,7 @@ export declare class ImportResult {
export declare class NxCache { export declare class NxCache {
cacheDirectory: string cacheDirectory: string
constructor(workspaceRoot: string, cachePath: string, dbConnection: ExternalObject<Connection>, linkTaskDetails?: boolean | undefined | null) constructor(workspaceRoot: string, cachePath: string, dbConnection: ExternalObject<NxDbConnection>, linkTaskDetails?: boolean | undefined | null)
get(hash: string): CachedResult | null get(hash: string): CachedResult | null
put(hash: string, terminalOutput: string, outputs: Array<string>, code: number): void put(hash: string, terminalOutput: string, outputs: Array<string>, code: number): void
applyRemoteCacheResults(hash: string, result: CachedResult): void applyRemoteCacheResults(hash: string, result: CachedResult): void
@ -39,7 +39,7 @@ export declare class NxCache {
} }
export declare class NxTaskHistory { export declare class NxTaskHistory {
constructor(db: ExternalObject<Connection>) constructor(db: ExternalObject<NxDbConnection>)
recordTaskRuns(taskRuns: Array<TaskRun>): void recordTaskRuns(taskRuns: Array<TaskRun>): void
getFlakyTasks(hashes: Array<string>): Array<string> getFlakyTasks(hashes: Array<string>): Array<string>
getEstimatedTaskTimings(targets: Array<TaskTarget>): Record<string, number> getEstimatedTaskTimings(targets: Array<TaskTarget>): Record<string, number>
@ -56,7 +56,7 @@ export declare class RustPseudoTerminal {
} }
export declare class TaskDetails { export declare class TaskDetails {
constructor(db: ExternalObject<Connection>) constructor(db: ExternalObject<NxDbConnection>)
recordTaskDetails(tasks: Array<HashedTask>): void recordTaskDetails(tasks: Array<HashedTask>): void
} }
@ -97,7 +97,7 @@ export interface CachedResult {
outputsPath: string outputsPath: string
} }
export declare export function connectToNxDb(cacheDir: string, nxVersion: string, dbName?: string | undefined | null): ExternalObject<Connection> export declare export function connectToNxDb(cacheDir: string, nxVersion: string, dbName?: string | undefined | null): ExternalObject<NxDbConnection>
export declare export function copy(src: string, dest: string): void export declare export function copy(src: string, dest: string): void

View File

@ -1,5 +1,6 @@
use crate::native::db::connection::NxDbConnection;
use napi::bindgen_prelude::*; use napi::bindgen_prelude::*;
use rusqlite::{params, Connection}; use rusqlite::params;
#[napi(object)] #[napi(object)]
#[derive(Default, Clone)] #[derive(Default, Clone)]
@ -11,14 +12,14 @@ pub struct HashedTask {
} }
#[napi] #[napi]
struct TaskDetails { pub struct TaskDetails {
db: External<Connection>, db: External<NxDbConnection>,
} }
#[napi] #[napi]
impl TaskDetails { impl TaskDetails {
#[napi(constructor)] #[napi(constructor)]
pub fn new(db: External<Connection>) -> anyhow::Result<Self> { pub fn new(db: External<NxDbConnection>) -> anyhow::Result<Self> {
let r = Self { db }; let r = Self { db };
r.setup()?; r.setup()?;
@ -28,8 +29,7 @@ impl TaskDetails {
fn setup(&self) -> anyhow::Result<()> { fn setup(&self) -> anyhow::Result<()> {
self.db.execute( self.db.execute(
" "CREATE TABLE IF NOT EXISTS task_details (
CREATE TABLE IF NOT EXISTS task_details (
hash TEXT PRIMARY KEY NOT NULL, hash TEXT PRIMARY KEY NOT NULL,
project TEXT NOT NULL, project TEXT NOT NULL,
target TEXT NOT NULL, target TEXT NOT NULL,

View File

@ -1,11 +1,10 @@
use std::rc::Rc; use crate::native::db::connection::NxDbConnection;
use std::collections::HashMap; use crate::native::tasks::types::TaskTarget;
use itertools::Itertools;
use napi::bindgen_prelude::*; use napi::bindgen_prelude::*;
use rusqlite::vtab::array; use rusqlite::vtab::array;
use rusqlite::{params, types::Value, Connection}; use rusqlite::{params, types::Value};
use std::collections::HashMap;
use crate::native::tasks::types::TaskTarget; use std::rc::Rc;
#[napi(object)] #[napi(object)]
pub struct TaskRun { pub struct TaskRun {
@ -18,13 +17,13 @@ pub struct TaskRun {
#[napi] #[napi]
pub struct NxTaskHistory { pub struct NxTaskHistory {
db: External<Connection>, db: External<NxDbConnection>,
} }
#[napi] #[napi]
impl NxTaskHistory { impl NxTaskHistory {
#[napi(constructor)] #[napi(constructor)]
pub fn new(db: External<Connection>) -> anyhow::Result<Self> { pub fn new(db: External<NxDbConnection>) -> anyhow::Result<Self> {
let s = Self { db }; let s = Self { db };
s.setup()?; s.setup()?;
@ -33,11 +32,11 @@ impl NxTaskHistory {
} }
fn setup(&self) -> anyhow::Result<()> { fn setup(&self) -> anyhow::Result<()> {
array::load_module(&self.db)?; array::load_module(&self.db.conn)?;
self.db self.db
.execute_batch( .execute_batch(
" "
BEGIN; BEGIN IMMEDIATE;
CREATE TABLE IF NOT EXISTS task_history ( CREATE TABLE IF NOT EXISTS task_history (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
hash TEXT NOT NULL, hash TEXT NOT NULL,
@ -99,16 +98,21 @@ impl NxTaskHistory {
} }
#[napi] #[napi]
pub fn get_estimated_task_timings(&self, targets: Vec<TaskTarget>) -> anyhow::Result<HashMap<String, f64>> { pub fn get_estimated_task_timings(
&self,
targets: Vec<TaskTarget>,
) -> anyhow::Result<HashMap<String, f64>> {
let values = Rc::new( let values = Rc::new(
targets targets
.iter() .iter()
.map(|t| Value::from( .map(|t| {
match &t.configuration { Value::from(match &t.configuration {
Some(configuration) => format!("{}:{}:{}", t.project, t.target, configuration), Some(configuration) => {
_ => format!("{}:{}", t.project, t.target) format!("{}:{}:{}", t.project, t.target, configuration)
} }
)) _ => format!("{}:{}", t.project, t.target),
})
})
.collect::<Vec<Value>>(), .collect::<Vec<Value>>(),
); );

View File

@ -3,23 +3,21 @@ import { join } from 'path';
import { TempFs } from '../../internal-testing-utils/temp-fs'; import { TempFs } from '../../internal-testing-utils/temp-fs';
import { rmSync } from 'fs'; import { rmSync } from 'fs';
import { getDbConnection } from '../../utils/db-connection'; import { getDbConnection } from '../../utils/db-connection';
import { randomBytes } from 'crypto';
describe('Cache', () => { describe('Cache', () => {
let cache: NxCache; let cache: NxCache;
let tempFs: TempFs; let tempFs: TempFs;
let taskDetails: TaskDetails; let taskDetails: TaskDetails;
const dbOutputFolder = 'temp-db-cache';
beforeEach(() => { beforeEach(() => {
tempFs = new TempFs('cache'); tempFs = new TempFs('cache');
rmSync(join(__dirname, 'temp-db'), {
recursive: true,
force: true,
});
const dbConnection = getDbConnection({ const dbConnection = getDbConnection({
directory: join(__dirname, 'temp-db'), directory: join(__dirname, dbOutputFolder),
dbName: `temp-db-${randomBytes(4).toString('hex')}`,
}); });
taskDetails = new TaskDetails(dbConnection); taskDetails = new TaskDetails(dbConnection);
cache = new NxCache( cache = new NxCache(
@ -38,6 +36,13 @@ describe('Cache', () => {
]); ]);
}); });
afterAll(() => {
rmSync(join(__dirname, dbOutputFolder), {
recursive: true,
force: true,
});
});
it('should store results into cache', async () => { it('should store results into cache', async () => {
const result = cache.get('123'); const result = cache.get('123');

View File

@ -3,7 +3,9 @@ import { join } from 'path';
import { TempFs } from '../../internal-testing-utils/temp-fs'; import { TempFs } from '../../internal-testing-utils/temp-fs';
import { rmSync } from 'fs'; import { rmSync } from 'fs';
import { getDbConnection } from '../../utils/db-connection'; import { getDbConnection } from '../../utils/db-connection';
import { randomBytes } from 'crypto';
const dbOutputFolder = 'temp-db-task';
describe('NxTaskHistory', () => { describe('NxTaskHistory', () => {
let taskHistory: NxTaskHistory; let taskHistory: NxTaskHistory;
let tempFs: TempFs; let tempFs: TempFs;
@ -12,13 +14,9 @@ describe('NxTaskHistory', () => {
beforeEach(() => { beforeEach(() => {
tempFs = new TempFs('task-history'); tempFs = new TempFs('task-history');
rmSync(join(__dirname, 'temp-db'), {
recursive: true,
force: true,
});
const dbConnection = getDbConnection({ const dbConnection = getDbConnection({
directory: join(__dirname, 'temp-db'), directory: join(__dirname, dbOutputFolder),
dbName: `temp-db-${randomBytes(4).toString('hex')}`,
}); });
taskHistory = new NxTaskHistory(dbConnection); taskHistory = new NxTaskHistory(dbConnection);
taskDetails = new TaskDetails(dbConnection); taskDetails = new TaskDetails(dbConnection);
@ -40,6 +38,13 @@ describe('NxTaskHistory', () => {
]); ]);
}); });
afterAll(() => {
rmSync(join(__dirname, dbOutputFolder), {
recursive: true,
force: true,
});
});
it('should record task history', () => { it('should record task history', () => {
taskHistory.recordTaskRuns([ taskHistory.recordTaskRuns([
{ {

View File

@ -54,6 +54,8 @@ export class DbCache {
private remoteCache: RemoteCacheV2 | null; private remoteCache: RemoteCacheV2 | null;
private remoteCachePromise: Promise<RemoteCacheV2>; private remoteCachePromise: Promise<RemoteCacheV2>;
private isVerbose = process.env.NX_VERBOSE_LOGGING === 'true';
constructor(private readonly options: { nxCloudRemoteCache: RemoteCache }) {} constructor(private readonly options: { nxCloudRemoteCache: RemoteCache }) {}
async init() { async init() {