Skip to content

feat: delete indexer from cluster #1284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 52 additions & 28 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod utils;

use futures::{future, stream, StreamExt};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use actix_web::http::header::{self, HeaderMap};
Expand All @@ -31,7 +32,7 @@ use clokwerk::{AsyncScheduler, Interval};
use http::{header as http_header, StatusCode};
use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::de::Error;
use serde::de::{DeserializeOwned, Error};
use serde_json::error::Error as SerdeError;
use serde_json::{to_vec, Value as JsonValue};
use tracing::{error, info, warn};
Expand All @@ -45,7 +46,8 @@ use crate::rbac::role::model::DefaultPrivilege;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::{
ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY,
ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY,
STREAM_ROOT_DIRECTORY,
};
use crate::HTTP_CLIENT;

Expand Down Expand Up @@ -642,6 +644,9 @@ async fn fetch_nodes_info<T: Metadata>(
nodes: Vec<T>,
) -> Result<Vec<utils::ClusterInfo>, StreamError> {
let nodes_len = nodes.len();
if nodes_len == 0 {
return Ok(vec![]);
}
let results = stream::iter(nodes)
.map(|node| async move { fetch_node_info(&node).await })
.buffer_unordered(nodes_len) // No concurrency limit
Expand Down Expand Up @@ -702,52 +707,71 @@ pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
Ok(arr)
}

pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, PostError> {
let domain_name = to_url_string(ingestor.into_inner());
pub async fn remove_node(node_url: Path<String>) -> Result<impl Responder, PostError> {
let domain_name = to_url_string(node_url.into_inner());

if check_liveness(&domain_name).await {
return Err(PostError::Invalid(anyhow::anyhow!(
"The ingestor is currently live and cannot be removed"
"The node is currently live and cannot be removed"
)));
}
let object_store = PARSEABLE.storage.get_object_store();

let ingestor_metadatas = object_store
// Delete ingestor metadata
let removed_ingestor =
remove_node_metadata::<IngestorMetadata>(&object_store, &domain_name).await?;

// Delete indexer metadata
let removed_indexer =
remove_node_metadata::<IndexerMetadata>(&object_store, &domain_name).await?;

let msg = if removed_ingestor || removed_indexer {
format!("node {} removed successfully", domain_name)
} else {
format!("node {} is not found", domain_name)
};

info!("{}", &msg);
Ok((msg, StatusCode::OK))
}

// Helper function to remove a specific type of node metadata
async fn remove_node_metadata<T: Metadata + DeserializeOwned + Default>(
object_store: &Arc<dyn ObjectStorage>,
domain_name: &str,
) -> Result<bool, PostError> {
let node_type = T::default().node_type().to_string();

let metadatas = object_store
.get_objects(
Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)),
Box::new(|file_name| file_name.starts_with("ingestor")),
Box::new(move |file_name| file_name.starts_with(&node_type)),
)
.await?;

let ingestor_metadata = ingestor_metadatas
let node_metadatas = metadatas
.iter()
.map(|elem| serde_json::from_slice::<IngestorMetadata>(elem).unwrap_or_default())
.collect_vec();
.filter_map(|elem| match serde_json::from_slice::<T>(elem) {
Ok(meta) if meta.domain_name() == domain_name => Some(meta),
_ => None,
})
.collect::<Vec<_>>();

let ingestor_metadata = ingestor_metadata
.iter()
.filter(|elem| elem.domain_name == domain_name)
.collect_vec();
if node_metadatas.is_empty() {
return Ok(false);
}

let ingestor_meta_filename = ingestor_metadata[0].file_path().to_string();
let msg = match object_store
.try_delete_ingestor_meta(ingestor_meta_filename)
.await
{
Ok(_) => {
format!("Ingestor {} removed successfully", domain_name)
}
let node_meta_filename = node_metadatas[0].file_path().to_string();
match object_store.try_delete_node_meta(node_meta_filename).await {
Ok(_) => Ok(true),
Err(err) => {
if matches!(err, ObjectStorageError::IoError(_)) {
format!("Ingestor {} is not found", domain_name)
Ok(false)
} else {
format!("Error removing ingestor {}\n Reason: {}", domain_name, err)
Err(PostError::ObjectStorageError(err))
}
}
};

info!("{}", &msg);
Ok((msg, StatusCode::OK))
}
}

/// Fetches metrics from a node (ingestor or indexer)
Expand Down
7 changes: 7 additions & 0 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ pub trait Metadata {
fn domain_name(&self) -> &str;
fn token(&self) -> &str;
fn node_type(&self) -> &str;
fn file_path(&self) -> RelativePathBuf;
}

impl Metadata for IngestorMetadata {
Expand All @@ -566,6 +567,9 @@ impl Metadata for IngestorMetadata {
fn node_type(&self) -> &str {
"ingestor"
}
fn file_path(&self) -> RelativePathBuf {
self.file_path()
}
}

impl Metadata for IndexerMetadata {
Expand All @@ -579,6 +583,9 @@ impl Metadata for IndexerMetadata {
fn node_type(&self) -> &str {
"indexer"
}
fn file_path(&self) -> RelativePathBuf {
self.file_path()
}
}
#[cfg(test)]
mod test {
Expand Down
8 changes: 4 additions & 4 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,13 @@ impl QueryServer {
.authorize(Action::ListClusterMetrics),
),
)
// DELETE "/cluster/{ingestor_domain:port}" ==> Delete an ingestor from the cluster
// DELETE "/cluster/{node_domain:port}" ==> Delete a node from the cluster
.service(
web::scope("/{ingestor}").service(
web::scope("/{node_url}").service(
web::resource("").route(
web::delete()
.to(cluster::remove_ingestor)
.authorize(Action::Deleteingestor),
.to(cluster::remove_node)
.authorize(Action::DeleteNode),
),
),
)
Expand Down
4 changes: 2 additions & 2 deletions src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum Action {
QueryLLM,
ListCluster,
ListClusterMetrics,
Deleteingestor,
DeleteNode,
All,
GetAnalytics,
ListDashboard,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl RoleBuilder {
| Action::DeleteCorrelation
| Action::GetCorrelation
| Action::PutCorrelation
| Action::Deleteingestor
| Action::DeleteNode
| Action::PutHotTierEnabled
| Action::GetHotTierEnabled
| Action::DeleteHotTierEnabled
Expand Down
9 changes: 3 additions & 6 deletions src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,11 +644,8 @@ impl ObjectStorage for BlobStore {
Ok(())
}

async fn try_delete_ingestor_meta(
&self,
ingestor_filename: String,
) -> Result<(), ObjectStorageError> {
let file = RelativePathBuf::from(&ingestor_filename);
async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> {
let file = RelativePathBuf::from(&node_filename);
match self.client.delete(&to_object_store_path(&file)).await {
Ok(_) => Ok(()),
Err(err) => {
Expand All @@ -658,7 +655,7 @@ impl ObjectStorage for BlobStore {
error!("Node does not exist");
Err(err.into())
} else {
error!("Error deleting ingestor meta file: {:?}", err);
error!("Error deleting node meta file: {:?}", err);
Err(err.into())
}
}
Expand Down
7 changes: 2 additions & 5 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,8 @@ impl ObjectStorage for LocalFS {
Ok(fs::remove_dir_all(path).await?)
}

async fn try_delete_ingestor_meta(
&self,
ingestor_filename: String,
) -> Result<(), ObjectStorageError> {
let path = self.root.join(ingestor_filename);
async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> {
let path = self.root.join(node_filename);
Ok(fs::remove_file(path).await?)
}

Expand Down
5 changes: 1 addition & 4 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
&self,
stream_name: &str,
) -> Result<Vec<RelativePathBuf>, ObjectStorageError>;
async fn try_delete_ingestor_meta(
&self,
ingestor_filename: String,
) -> Result<(), ObjectStorageError>;
async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError>;
/// Returns the amount of time taken by the `ObjectStore` to perform a get
/// call.
async fn get_latency(&self) -> Duration {
Expand Down
9 changes: 3 additions & 6 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,11 +727,8 @@ impl ObjectStorage for S3 {
Ok(())
}

async fn try_delete_ingestor_meta(
&self,
ingestor_filename: String,
) -> Result<(), ObjectStorageError> {
let file = RelativePathBuf::from(&ingestor_filename);
async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> {
let file = RelativePathBuf::from(&node_filename);
match self.client.delete(&to_object_store_path(&file)).await {
Ok(_) => Ok(()),
Err(err) => {
Expand All @@ -741,7 +738,7 @@ impl ObjectStorage for S3 {
error!("Node does not exist");
Err(err.into())
} else {
error!("Error deleting ingestor meta file: {:?}", err);
error!("Error deleting node meta file: {:?}", err);
Err(err.into())
}
}
Expand Down
Loading