Skip to content

Commit 1a57dc1

Browse files
feat: delete indexer from cluster
DELETE /cluster/{node_url} to handle deletion of all nodes including ingestor and indexer
1 parent d00ac8c commit 1a57dc1

File tree

7 files changed

+64
-33
lines changed

7 files changed

+64
-33
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod utils;
2020

2121
use futures::{future, stream, StreamExt};
2222
use std::collections::HashSet;
23+
use std::sync::Arc;
2324
use std::time::Duration;
2425

2526
use actix_web::http::header::{self, HeaderMap};
@@ -31,7 +32,7 @@ use clokwerk::{AsyncScheduler, Interval};
3132
use http::{header as http_header, StatusCode};
3233
use itertools::Itertools;
3334
use relative_path::RelativePathBuf;
34-
use serde::de::Error;
35+
use serde::de::{DeserializeOwned, Error};
3536
use serde_json::error::Error as SerdeError;
3637
use serde_json::{to_vec, Value as JsonValue};
3738
use tracing::{error, info, warn};
@@ -45,7 +46,8 @@ use crate::rbac::role::model::DefaultPrivilege;
4546
use crate::rbac::user::User;
4647
use crate::stats::Stats;
4748
use crate::storage::{
48-
ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY,
49+
ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY,
50+
STREAM_ROOT_DIRECTORY,
4951
};
5052
use crate::HTTP_CLIENT;
5153

@@ -642,6 +644,9 @@ async fn fetch_nodes_info<T: Metadata>(
642644
nodes: Vec<T>,
643645
) -> Result<Vec<utils::ClusterInfo>, StreamError> {
644646
let nodes_len = nodes.len();
647+
if nodes_len == 0 {
648+
return Ok(vec![]);
649+
}
645650
let results = stream::iter(nodes)
646651
.map(|node| async move { fetch_node_info(&node).await })
647652
.buffer_unordered(nodes_len) // No concurrency limit
@@ -702,52 +707,71 @@ pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
702707
Ok(arr)
703708
}
704709

705-
pub async fn remove_ingestor(ingestor: Path<String>) -> Result<impl Responder, PostError> {
706-
let domain_name = to_url_string(ingestor.into_inner());
710+
pub async fn remove_node(node_url: Path<String>) -> Result<impl Responder, PostError> {
711+
let domain_name = to_url_string(node_url.into_inner());
707712

708713
if check_liveness(&domain_name).await {
709714
return Err(PostError::Invalid(anyhow::anyhow!(
710-
"The ingestor is currently live and cannot be removed"
715+
"The node is currently live and cannot be removed"
711716
)));
712717
}
713718
let object_store = PARSEABLE.storage.get_object_store();
714719

715-
let ingestor_metadatas = object_store
720+
// Delete ingestor metadata
721+
let removed_ingestor =
722+
remove_node_metadata::<IngestorMetadata>(&object_store, &domain_name).await?;
723+
724+
// Delete indexer metadata
725+
let removed_indexer =
726+
remove_node_metadata::<IndexerMetadata>(&object_store, &domain_name).await?;
727+
728+
let msg = if removed_ingestor || removed_indexer {
729+
format!("node {} removed successfully", domain_name)
730+
} else {
731+
format!("node {} is not found", domain_name)
732+
};
733+
734+
info!("{}", &msg);
735+
Ok((msg, StatusCode::OK))
736+
}
737+
738+
// Helper function to remove a specific type of node metadata
739+
async fn remove_node_metadata<T: Metadata + DeserializeOwned + Default>(
740+
object_store: &Arc<dyn ObjectStorage>,
741+
domain_name: &str,
742+
) -> Result<bool, PostError> {
743+
let node_type = T::default().node_type().to_string();
744+
745+
let metadatas = object_store
716746
.get_objects(
717747
Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)),
718-
Box::new(|file_name| file_name.starts_with("ingestor")),
748+
Box::new(move |file_name| file_name.starts_with(&node_type)),
719749
)
720750
.await?;
721751

722-
let ingestor_metadata = ingestor_metadatas
752+
let node_metadatas = metadatas
723753
.iter()
724-
.map(|elem| serde_json::from_slice::<IngestorMetadata>(elem).unwrap_or_default())
725-
.collect_vec();
754+
.filter_map(|elem| match serde_json::from_slice::<T>(elem) {
755+
Ok(meta) if meta.domain_name() == domain_name => Some(meta),
756+
_ => None,
757+
})
758+
.collect::<Vec<_>>();
726759

727-
let ingestor_metadata = ingestor_metadata
728-
.iter()
729-
.filter(|elem| elem.domain_name == domain_name)
730-
.collect_vec();
760+
if node_metadatas.is_empty() {
761+
return Ok(false);
762+
}
731763

732-
let ingestor_meta_filename = ingestor_metadata[0].file_path().to_string();
733-
let msg = match object_store
734-
.try_delete_ingestor_meta(ingestor_meta_filename)
735-
.await
736-
{
737-
Ok(_) => {
738-
format!("Ingestor {} removed successfully", domain_name)
739-
}
764+
let node_meta_filename = node_metadatas[0].file_path().to_string();
765+
match object_store.try_delete_node_meta(node_meta_filename).await {
766+
Ok(_) => Ok(true),
740767
Err(err) => {
741768
if matches!(err, ObjectStorageError::IoError(_)) {
742-
format!("Ingestor {} is not found", domain_name)
769+
Ok(false)
743770
} else {
744-
format!("Error removing ingestor {}\n Reason: {}", domain_name, err)
771+
Err(PostError::ObjectStorageError(err))
745772
}
746773
}
747-
};
748-
749-
info!("{}", &msg);
750-
Ok((msg, StatusCode::OK))
774+
}
751775
}
752776

753777
/// Fetches metrics from a node (ingestor or indexer)

src/handlers/http/modal/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ pub trait Metadata {
553553
fn domain_name(&self) -> &str;
554554
fn token(&self) -> &str;
555555
fn node_type(&self) -> &str;
556+
fn file_path(&self) -> RelativePathBuf;
556557
}
557558

558559
impl Metadata for IngestorMetadata {
@@ -566,6 +567,9 @@ impl Metadata for IngestorMetadata {
566567
fn node_type(&self) -> &str {
567568
"ingestor"
568569
}
570+
fn file_path(&self) -> RelativePathBuf {
571+
self.file_path()
572+
}
569573
}
570574

571575
impl Metadata for IndexerMetadata {
@@ -579,6 +583,9 @@ impl Metadata for IndexerMetadata {
579583
fn node_type(&self) -> &str {
580584
"indexer"
581585
}
586+
fn file_path(&self) -> RelativePathBuf {
587+
self.file_path()
588+
}
582589
}
583590
#[cfg(test)]
584591
mod test {

src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ impl QueryServer {
338338
web::scope("/{ingestor}").service(
339339
web::resource("").route(
340340
web::delete()
341-
.to(cluster::remove_ingestor)
341+
.to(cluster::remove_node)
342342
.authorize(Action::Deleteingestor),
343343
),
344344
),

src/storage/azure_blob.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ impl ObjectStorage for BlobStore {
644644
Ok(())
645645
}
646646

647-
async fn try_delete_ingestor_meta(
647+
async fn try_delete_node_meta(
648648
&self,
649649
ingestor_filename: String,
650650
) -> Result<(), ObjectStorageError> {

src/storage/localfs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ impl ObjectStorage for LocalFS {
325325
Ok(fs::remove_dir_all(path).await?)
326326
}
327327

328-
async fn try_delete_ingestor_meta(
328+
async fn try_delete_node_meta(
329329
&self,
330330
ingestor_filename: String,
331331
) -> Result<(), ObjectStorageError> {

src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
201201
&self,
202202
stream_name: &str,
203203
) -> Result<Vec<RelativePathBuf>, ObjectStorageError>;
204-
async fn try_delete_ingestor_meta(
204+
async fn try_delete_node_meta(
205205
&self,
206206
ingestor_filename: String,
207207
) -> Result<(), ObjectStorageError>;

src/storage/s3.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,7 @@ impl ObjectStorage for S3 {
727727
Ok(())
728728
}
729729

730-
async fn try_delete_ingestor_meta(
730+
async fn try_delete_node_meta(
731731
&self,
732732
ingestor_filename: String,
733733
) -> Result<(), ObjectStorageError> {

0 commit comments

Comments
 (0)