diff --git a/Cargo.lock b/Cargo.lock index a89dbfbe..4da985a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1454,6 +1454,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1731,6 +1737,11 @@ name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "hashlink" @@ -2253,6 +2264,15 @@ dependencies = [ "logos-codegen", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "lsp-types" version = "0.94.1" @@ -3066,10 +3086,10 @@ dependencies = [ "biome_js_factory", "biome_js_syntax", "biome_rowan", - "dashmap 5.5.3", "futures", "globset", "ignore", + "lru", "pgt_analyse", "pgt_analyser", "pgt_completions", diff --git a/crates/pgt_workspace/Cargo.toml b/crates/pgt_workspace/Cargo.toml index 6b0cc065..f535e505 100644 --- a/crates/pgt_workspace/Cargo.toml +++ b/crates/pgt_workspace/Cargo.toml @@ -13,9 +13,9 @@ version = "0.0.0" [dependencies] biome_deserialize = "0.6.0" -dashmap = "5.5.3" futures = "0.3.31" globset = "0.4.16" +lru = "0.12" ignore = { workspace = true } pgt_analyse = { workspace = true, features = ["serde"] } diff --git a/crates/pgt_workspace/src/workspace/server.rs b/crates/pgt_workspace/src/workspace/server.rs index 81aa99ab..f7ace3c2 100644 --- a/crates/pgt_workspace/src/workspace/server.rs +++ b/crates/pgt_workspace/src/workspace/server.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, fs, panic::RefUnwindSafe, path::{Path, PathBuf}, @@ -8,7 +9,6 @@ use std::{ use analyser::AnalyserVisitorBuilder; use async_helper::run_async; use connection_manager::ConnectionManager; -use dashmap::DashMap; use document::{ AsyncDiagnosticsMapper, CursorPositionFilter, DefaultMapper, Document, ExecuteStatementMapper, SyncDiagnosticsMapper, @@ -67,7 +67,7 @@ pub(super) struct WorkspaceServer { /// Stores the schema cache for this workspace schema_cache: SchemaCacheManager, - documents: DashMap, + documents: RwLock>, connection: ConnectionManager, } @@ -89,7 +89,7 @@ impl WorkspaceServer { pub(crate) fn new() -> Self { Self { settings: RwLock::default(), - documents: DashMap::default(), + documents: RwLock::new(HashMap::new()), schema_cache: SchemaCacheManager::new(), connection: ConnectionManager::new(), } @@ -262,7 +262,8 @@ impl Workspace for WorkspaceServer { /// Add a new file to the workspace #[tracing::instrument(level = "info", skip_all, fields(path = params.path.as_path().as_os_str().to_str()), err)] fn open_file(&self, params: OpenFileParams) -> Result<(), WorkspaceError> { - self.documents + let mut documents = self.documents.write().unwrap(); + documents .entry(params.path.clone()) .or_insert_with(|| Document::new(params.content, params.version)); @@ -275,7 +276,8 @@ impl Workspace for WorkspaceServer { /// Remove a file from the workspace fn close_file(&self, params: super::CloseFileParams) -> Result<(), WorkspaceError> { - self.documents + let mut documents = self.documents.write().unwrap(); + documents .remove(¶ms.path) .ok_or_else(WorkspaceError::not_found)?; @@ -288,13 +290,15 @@ impl Workspace for WorkspaceServer { version = params.version ), err)] fn change_file(&self, params: super::ChangeFileParams) -> Result<(), WorkspaceError> { - match self.documents.entry(params.path.clone()) { - dashmap::mapref::entry::Entry::Occupied(mut entry) => { + let mut documents = self.documents.write().unwrap(); + + match documents.entry(params.path.clone()) { + std::collections::hash_map::Entry::Occupied(mut entry) => { entry .get_mut() .update_content(params.content, params.version); } - dashmap::mapref::entry::Entry::Vacant(entry) => { + std::collections::hash_map::Entry::Vacant(entry) => { entry.insert(Document::new(params.content, params.version)); } } @@ -307,8 +311,8 @@ impl Workspace for WorkspaceServer { } fn get_file_content(&self, params: GetFileContentParams) -> Result { - let document = self - .documents + let documents = self.documents.read().unwrap(); + let document = documents .get(¶ms.path) .ok_or(WorkspaceError::not_found())?; Ok(document.get_document_content().to_string()) @@ -322,8 +326,8 @@ impl Workspace for WorkspaceServer { &self, params: code_actions::CodeActionsParams, ) -> Result { - let parser = self - .documents + let documents = self.documents.read().unwrap(); + let parser = documents .get(¶ms.path) .ok_or(WorkspaceError::not_found())?; @@ -364,8 +368,8 @@ impl Workspace for WorkspaceServer { &self, params: ExecuteStatementParams, ) -> Result { - let parser = self - .documents + let documents = self.documents.read().unwrap(); + let parser = documents .get(¶ms.path) .ok_or(WorkspaceError::not_found())?; @@ -422,8 +426,8 @@ impl Workspace for WorkspaceServer { } }; - let doc = self - .documents + let documents = self.documents.read().unwrap(); + let doc = documents .get(¶ms.path) .ok_or(WorkspaceError::not_found())?; @@ -607,8 +611,8 @@ impl Workspace for WorkspaceServer { &self, params: GetCompletionsParams, ) -> Result { - let parsed_doc = self - .documents + let documents = self.documents.read().unwrap(); + let parsed_doc = documents .get(¶ms.path) .ok_or(WorkspaceError::not_found())?; @@ -621,7 +625,7 @@ impl Workspace for WorkspaceServer { let schema_cache = self.schema_cache.load(pool)?; - match get_statement_for_completions(&parsed_doc, params.position) { + match get_statement_for_completions(parsed_doc, params.position) { None => { tracing::debug!("No statement found."); Ok(CompletionsResult::default()) diff --git a/crates/pgt_workspace/src/workspace/server/annotation.rs b/crates/pgt_workspace/src/workspace/server/annotation.rs index 20710521..0ff6cc0a 100644 --- a/crates/pgt_workspace/src/workspace/server/annotation.rs +++ b/crates/pgt_workspace/src/workspace/server/annotation.rs @@ -1,17 +1,20 @@ -use std::sync::Arc; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; -use dashmap::DashMap; +use lru::LruCache; use pgt_lexer::SyntaxKind; use super::statement_identifier::StatementId; +const DEFAULT_CACHE_SIZE: usize = 1000; + #[derive(Debug, Clone, PartialEq, Eq)] pub struct StatementAnnotations { ends_with_semicolon: bool, } pub struct AnnotationStore { - db: DashMap>, + db: Mutex>>, } const WHITESPACE_TOKENS: [SyntaxKind; 6] = [ @@ -25,7 +28,11 @@ const WHITESPACE_TOKENS: [SyntaxKind; 6] = [ impl AnnotationStore { pub fn new() -> AnnotationStore { - AnnotationStore { db: DashMap::new() } + AnnotationStore { + db: Mutex::new(LruCache::new( + NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap(), + )), + } } #[allow(unused)] @@ -34,8 +41,10 @@ impl AnnotationStore { statement_id: &StatementId, content: &str, ) -> Arc { - if let Some(existing) = self.db.get(statement_id).map(|x| x.clone()) { - return existing; + let mut cache = self.db.lock().unwrap(); + + if let Some(existing) = cache.get(statement_id) { + return existing.clone(); } let lexed = pgt_lexer::lex(content); @@ -51,7 +60,7 @@ impl AnnotationStore { ends_with_semicolon, }); - self.db.insert(statement_id.clone(), annotations.clone()); + cache.put(statement_id.clone(), annotations.clone()); annotations } diff --git a/crates/pgt_workspace/src/workspace/server/connection_manager.rs b/crates/pgt_workspace/src/workspace/server/connection_manager.rs index d21988f0..8955b378 100644 --- a/crates/pgt_workspace/src/workspace/server/connection_manager.rs +++ b/crates/pgt_workspace/src/workspace/server/connection_manager.rs @@ -1,6 +1,7 @@ +use std::collections::HashMap; +use std::sync::RwLock; use std::time::{Duration, Instant}; -use dashmap::DashMap; use sqlx::{PgPool, Postgres, pool::PoolOptions, postgres::PgConnectOptions}; use crate::settings::DatabaseSettings; @@ -16,13 +17,13 @@ struct CachedPool { #[derive(Default)] pub struct ConnectionManager { - pools: DashMap, + pools: RwLock>, } impl ConnectionManager { pub fn new() -> Self { Self { - pools: DashMap::new(), + pools: RwLock::new(HashMap::new()), } } @@ -41,8 +42,19 @@ impl ConnectionManager { return None; } - // If we have a cached pool, update its last_accessed time and return it - if let Some(mut cached_pool) = self.pools.get_mut(&key) { + // Try read lock first for cache hit + if let Ok(pools) = self.pools.read() { + if let Some(cached_pool) = pools.get(&key) { + // Can't update last_accessed with read lock, but that's okay for occasional misses + return Some(cached_pool.pool.clone()); + } + } + + // Cache miss or need to update timestamp - use write lock + let mut pools = self.pools.write().unwrap(); + + // Double-check after acquiring write lock + if let Some(cached_pool) = pools.get_mut(&key) { cached_pool.last_accessed = Instant::now(); return Some(cached_pool.pool.clone()); } @@ -69,7 +81,7 @@ impl ConnectionManager { idle_timeout: Duration::from_secs(60 * 5), }; - self.pools.insert(key, cached_pool); + pools.insert(key, cached_pool); Some(pool) } @@ -78,8 +90,10 @@ impl ConnectionManager { fn cleanup_idle_pools(&self, ignore_key: &ConnectionKey) { let now = Instant::now(); + let mut pools = self.pools.write().unwrap(); + // Use retain to keep only non-idle connections - self.pools.retain(|key, cached_pool| { + pools.retain(|key, cached_pool| { let idle_duration = now.duration_since(cached_pool.last_accessed); if idle_duration > cached_pool.idle_timeout && key != ignore_key { tracing::debug!( diff --git a/crates/pgt_workspace/src/workspace/server/pg_query.rs b/crates/pgt_workspace/src/workspace/server/pg_query.rs index 45af96e7..6f1fa2c1 100644 --- a/crates/pgt_workspace/src/workspace/server/pg_query.rs +++ b/crates/pgt_workspace/src/workspace/server/pg_query.rs @@ -1,29 +1,38 @@ -use std::sync::Arc; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; -use dashmap::DashMap; +use lru::LruCache; use pgt_query_ext::diagnostics::*; use super::statement_identifier::StatementId; +const DEFAULT_CACHE_SIZE: usize = 1000; + pub struct PgQueryStore { - db: DashMap>>, + db: Mutex>>>, } impl PgQueryStore { pub fn new() -> PgQueryStore { - PgQueryStore { db: DashMap::new() } + PgQueryStore { + db: Mutex::new(LruCache::new( + NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap(), + )), + } } pub fn get_or_cache_ast( &self, statement: &StatementId, ) -> Arc> { - if let Some(existing) = self.db.get(statement).map(|x| x.clone()) { - return existing; + let mut cache = self.db.lock().unwrap(); + + if let Some(existing) = cache.get(statement) { + return existing.clone(); } let r = Arc::new(pgt_query_ext::parse(statement.content()).map_err(SyntaxDiagnostic::from)); - self.db.insert(statement.clone(), r.clone()); + cache.put(statement.clone(), r.clone()); r } } diff --git a/crates/pgt_workspace/src/workspace/server/schema_cache_manager.rs b/crates/pgt_workspace/src/workspace/server/schema_cache_manager.rs index b42dfc34..007ebb78 100644 --- a/crates/pgt_workspace/src/workspace/server/schema_cache_manager.rs +++ b/crates/pgt_workspace/src/workspace/server/schema_cache_manager.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; -use dashmap::DashMap; use pgt_schema_cache::SchemaCache; use sqlx::PgPool; @@ -10,38 +10,41 @@ use super::{async_helper::run_async, connection_key::ConnectionKey}; #[derive(Default)] pub struct SchemaCacheManager { - schemas: DashMap>, + schemas: RwLock>>, } impl SchemaCacheManager { pub fn new() -> Self { Self { - schemas: DashMap::new(), + schemas: RwLock::new(HashMap::new()), } } pub fn load(&self, pool: PgPool) -> Result, WorkspaceError> { let key: ConnectionKey = (&pool).into(); - if let Some(cache) = self.schemas.get(&key) { - return Ok(Arc::clone(&*cache)); + // Try read lock first for cache hit + if let Ok(schemas) = self.schemas.read() { + if let Some(cache) = schemas.get(&key) { + return Ok(Arc::clone(cache)); + } } - let schema_cache = self - .schemas - .entry(key) - .or_try_insert_with::(|| { - // This closure will only be called once per key if multiple threads - // try to access the same key simultaneously - let pool_clone = pool.clone(); - let schema_cache = - Arc::new(run_async( - async move { SchemaCache::load(&pool_clone).await }, - )??); - - Ok(schema_cache) - })?; - - Ok(Arc::clone(&schema_cache)) + // Cache miss - need write lock to insert + let mut schemas = self.schemas.write().unwrap(); + + // Double-check after acquiring write lock + if let Some(cache) = schemas.get(&key) { + return Ok(Arc::clone(cache)); + } + + // Load schema cache + let pool_clone = pool.clone(); + let schema_cache = Arc::new(run_async( + async move { SchemaCache::load(&pool_clone).await }, + )??); + + schemas.insert(key, schema_cache.clone()); + Ok(schema_cache) } } diff --git a/crates/pgt_workspace/src/workspace/server/tree_sitter.rs b/crates/pgt_workspace/src/workspace/server/tree_sitter.rs index 2cd73133..b8f62b63 100644 --- a/crates/pgt_workspace/src/workspace/server/tree_sitter.rs +++ b/crates/pgt_workspace/src/workspace/server/tree_sitter.rs @@ -1,11 +1,14 @@ +use std::num::NonZeroUsize; use std::sync::{Arc, Mutex}; -use dashmap::DashMap; +use lru::LruCache; use super::statement_identifier::StatementId; +const DEFAULT_CACHE_SIZE: usize = 1000; + pub struct TreeSitterStore { - db: DashMap>, + db: Mutex>>, parser: Mutex, } @@ -17,20 +20,35 @@ impl TreeSitterStore { .expect("Error loading sql language"); TreeSitterStore { - db: DashMap::new(), + db: Mutex::new(LruCache::new( + NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap(), + )), parser: Mutex::new(parser), } } pub fn get_or_cache_tree(&self, statement: &StatementId) -> Arc { - if let Some(existing) = self.db.get(statement).map(|x| x.clone()) { - return existing; + let mut cache = self.db.lock().expect("Failed to lock cache"); + + if let Some(existing) = cache.get(statement) { + return existing.clone(); } + // Cache miss - drop cache lock, parse, then re-acquire to insert + drop(cache); + let mut parser = self.parser.lock().expect("Failed to lock parser"); let tree = Arc::new(parser.parse(statement.content(), None).unwrap()); - self.db.insert(statement.clone(), tree.clone()); + drop(parser); + + let mut cache = self.db.lock().expect("Failed to lock cache"); + + // Double-check after re-acquiring lock + if let Some(existing) = cache.get(statement) { + return existing.clone(); + } + cache.put(statement.clone(), tree.clone()); tree } }