diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 0c178190b..a452dedca 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -20,6 +20,7 @@ pub mod utils; use futures::{future, stream, StreamExt}; use std::collections::HashSet; +use std::sync::Arc; use std::time::Duration; use actix_web::http::header::{self, HeaderMap}; @@ -31,7 +32,7 @@ use clokwerk::{AsyncScheduler, Interval}; use http::{header as http_header, StatusCode}; use itertools::Itertools; use relative_path::RelativePathBuf; -use serde::de::Error; +use serde::de::{DeserializeOwned, Error}; use serde_json::error::Error as SerdeError; use serde_json::{to_vec, Value as JsonValue}; use tracing::{error, info, warn}; @@ -45,7 +46,8 @@ use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; use crate::stats::Stats; use crate::storage::{ - ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY, + ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, + STREAM_ROOT_DIRECTORY, }; use crate::HTTP_CLIENT; @@ -642,6 +644,9 @@ async fn fetch_nodes_info( nodes: Vec, ) -> Result, StreamError> { let nodes_len = nodes.len(); + if nodes_len == 0 { + return Ok(vec![]); + } let results = stream::iter(nodes) .map(|node| async move { fetch_node_info(&node).await }) .buffer_unordered(nodes_len) // No concurrency limit @@ -702,52 +707,71 @@ pub async fn get_indexer_info() -> anyhow::Result { Ok(arr) } -pub async fn remove_ingestor(ingestor: Path) -> Result { - let domain_name = to_url_string(ingestor.into_inner()); +pub async fn remove_node(node_url: Path) -> Result { + let domain_name = to_url_string(node_url.into_inner()); if check_liveness(&domain_name).await { return Err(PostError::Invalid(anyhow::anyhow!( - "The ingestor is currently live and cannot be removed" + "The node is currently live and cannot be removed" ))); } let object_store = PARSEABLE.storage.get_object_store(); - let ingestor_metadatas = object_store + // Delete ingestor metadata + let removed_ingestor = + remove_node_metadata::(&object_store, &domain_name).await?; + + // Delete indexer metadata + let removed_indexer = + remove_node_metadata::(&object_store, &domain_name).await?; + + let msg = if removed_ingestor || removed_indexer { + format!("node {} removed successfully", domain_name) + } else { + format!("node {} is not found", domain_name) + }; + + info!("{}", &msg); + Ok((msg, StatusCode::OK)) +} + +// Helper function to remove a specific type of node metadata +async fn remove_node_metadata( + object_store: &Arc, + domain_name: &str, +) -> Result { + let node_type = T::default().node_type().to_string(); + + let metadatas = object_store .get_objects( Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)), - Box::new(|file_name| file_name.starts_with("ingestor")), + Box::new(move |file_name| file_name.starts_with(&node_type)), ) .await?; - let ingestor_metadata = ingestor_metadatas + let node_metadatas = metadatas .iter() - .map(|elem| serde_json::from_slice::(elem).unwrap_or_default()) - .collect_vec(); + .filter_map(|elem| match serde_json::from_slice::(elem) { + Ok(meta) if meta.domain_name() == domain_name => Some(meta), + _ => None, + }) + .collect::>(); - let ingestor_metadata = ingestor_metadata - .iter() - .filter(|elem| elem.domain_name == domain_name) - .collect_vec(); + if node_metadatas.is_empty() { + return Ok(false); + } - let ingestor_meta_filename = ingestor_metadata[0].file_path().to_string(); - let msg = match object_store - .try_delete_ingestor_meta(ingestor_meta_filename) - .await - { - Ok(_) => { - format!("Ingestor {} removed successfully", domain_name) - } + let node_meta_filename = node_metadatas[0].file_path().to_string(); + match object_store.try_delete_node_meta(node_meta_filename).await { + Ok(_) => Ok(true), Err(err) => { if matches!(err, ObjectStorageError::IoError(_)) { - format!("Ingestor {} is not found", domain_name) + Ok(false) } else { - format!("Error removing ingestor {}\n Reason: {}", domain_name, err) + Err(PostError::ObjectStorageError(err)) } } - }; - - info!("{}", &msg); - Ok((msg, StatusCode::OK)) + } } /// Fetches metrics from a node (ingestor or indexer) diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 3a2f236c4..a0774f36b 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -553,6 +553,7 @@ pub trait Metadata { fn domain_name(&self) -> &str; fn token(&self) -> &str; fn node_type(&self) -> &str; + fn file_path(&self) -> RelativePathBuf; } impl Metadata for IngestorMetadata { @@ -566,6 +567,9 @@ impl Metadata for IngestorMetadata { fn node_type(&self) -> &str { "ingestor" } + fn file_path(&self) -> RelativePathBuf { + self.file_path() + } } impl Metadata for IndexerMetadata { @@ -579,6 +583,9 @@ impl Metadata for IndexerMetadata { fn node_type(&self) -> &str { "indexer" } + fn file_path(&self) -> RelativePathBuf { + self.file_path() + } } #[cfg(test)] mod test { diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index dde519d2e..d4ce26f72 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -333,13 +333,13 @@ impl QueryServer { .authorize(Action::ListClusterMetrics), ), ) - // DELETE "/cluster/{ingestor_domain:port}" ==> Delete an ingestor from the cluster + // DELETE "/cluster/{node_domain:port}" ==> Delete a node from the cluster .service( - web::scope("/{ingestor}").service( + web::scope("/{node_url}").service( web::resource("").route( web::delete() - .to(cluster::remove_ingestor) - .authorize(Action::Deleteingestor), + .to(cluster::remove_node) + .authorize(Action::DeleteNode), ), ), ) diff --git a/src/rbac/role.rs b/src/rbac/role.rs index 00208631c..ee5d5d6b9 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -50,7 +50,7 @@ pub enum Action { QueryLLM, ListCluster, ListClusterMetrics, - Deleteingestor, + DeleteNode, All, GetAnalytics, ListDashboard, @@ -128,7 +128,7 @@ impl RoleBuilder { | Action::DeleteCorrelation | Action::GetCorrelation | Action::PutCorrelation - | Action::Deleteingestor + | Action::DeleteNode | Action::PutHotTierEnabled | Action::GetHotTierEnabled | Action::DeleteHotTierEnabled diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index a7c6709ce..bcd8d2a51 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -644,11 +644,8 @@ impl ObjectStorage for BlobStore { Ok(()) } - async fn try_delete_ingestor_meta( - &self, - ingestor_filename: String, - ) -> Result<(), ObjectStorageError> { - let file = RelativePathBuf::from(&ingestor_filename); + async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> { + let file = RelativePathBuf::from(&node_filename); match self.client.delete(&to_object_store_path(&file)).await { Ok(_) => Ok(()), Err(err) => { @@ -658,7 +655,7 @@ impl ObjectStorage for BlobStore { error!("Node does not exist"); Err(err.into()) } else { - error!("Error deleting ingestor meta file: {:?}", err); + error!("Error deleting node meta file: {:?}", err); Err(err.into()) } } diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 1166ff31a..3b79f4028 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -325,11 +325,8 @@ impl ObjectStorage for LocalFS { Ok(fs::remove_dir_all(path).await?) } - async fn try_delete_ingestor_meta( - &self, - ingestor_filename: String, - ) -> Result<(), ObjectStorageError> { - let path = self.root.join(ingestor_filename); + async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> { + let path = self.root.join(node_filename); Ok(fs::remove_file(path).await?) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index dea669975..a40072f62 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -201,10 +201,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { &self, stream_name: &str, ) -> Result, ObjectStorageError>; - async fn try_delete_ingestor_meta( - &self, - ingestor_filename: String, - ) -> Result<(), ObjectStorageError>; + async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError>; /// Returns the amount of time taken by the `ObjectStore` to perform a get /// call. async fn get_latency(&self) -> Duration { diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 69b0acdfe..7a8d2bbdc 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -727,11 +727,8 @@ impl ObjectStorage for S3 { Ok(()) } - async fn try_delete_ingestor_meta( - &self, - ingestor_filename: String, - ) -> Result<(), ObjectStorageError> { - let file = RelativePathBuf::from(&ingestor_filename); + async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> { + let file = RelativePathBuf::from(&node_filename); match self.client.delete(&to_object_store_path(&file)).await { Ok(_) => Ok(()), Err(err) => { @@ -741,7 +738,7 @@ impl ObjectStorage for S3 { error!("Node does not exist"); Err(err.into()) } else { - error!("Error deleting ingestor meta file: {:?}", err); + error!("Error deleting node meta file: {:?}", err); Err(err.into()) } }