Skip to content

Commit 447db8b

Browse files
fmassotfulmicoton
andauthored
Remove publisher finalize and shut down property a merge pipeline wit… (#2263)
* Remove publisher finalize and shut down property a merge pipeline with no corresponding indexing pipelines. * Fixing one unit test. The indexing pipeline updates its statistics on its finalize. * Using quickwit-actors with the testsuite feature when running tests. * Using the HEARTBEAT as the delete pipeline observation period. * Apply suggestions from code review Co-authored-by: Paul Masurel <[email protected]>
1 parent 2b4def7 commit 447db8b

File tree

10 files changed

+235
-39
lines changed

10 files changed

+235
-39
lines changed

quickwit/quickwit-actors/src/actor.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::convert::Infallible;
2222
use std::fmt;
2323
use std::ops::Deref;
2424
use std::sync::atomic::{AtomicUsize, Ordering};
25-
use std::sync::{Arc, Mutex};
25+
use std::sync::Arc;
2626
use std::time::Duration;
2727

2828
use async_trait::async_trait;
@@ -239,7 +239,7 @@ pub struct ActorContextInner<A: Actor> {
239239
// This counter is useful to unsure that obsolete WakeUp
240240
// events do not effect ulterior `sleep`.
241241
sleep_count: AtomicUsize,
242-
observable_state_tx: Mutex<watch::Sender<A::ObservableState>>,
242+
observable_state_tx: watch::Sender<A::ObservableState>,
243243
}
244244

245245
/// Internal command used to resume an actor that was paused using
@@ -285,7 +285,7 @@ impl<A: Actor> ActorContext<A> {
285285
registry,
286286
actor_state: AtomicState::default(),
287287
sleep_count: AtomicUsize::default(),
288-
observable_state_tx: Mutex::new(observable_state_tx),
288+
observable_state_tx,
289289
}
290290
.into(),
291291
}
@@ -408,11 +408,7 @@ impl<A: Actor> ActorContext<A> {
408408

409409
pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState {
410410
let obs_state = actor.observable_state();
411-
let _ = self
412-
.observable_state_tx
413-
.lock()
414-
.unwrap()
415-
.send(obs_state.clone());
411+
let _ = self.observable_state_tx.send(obs_state.clone());
416412
obs_state
417413
}
418414

quickwit/quickwit-actors/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub use self::supervisor::{Supervisor, SupervisorState};
6868
/// If an actor does not advertise a progress within an interval of duration `HEARTBEAT`,
6969
/// its supervisor will consider it as blocked and will proceed to kill it, as well
7070
/// as all of the actors all the actors that share the killswitch.
71-
pub const HEARTBEAT: Duration = if cfg!(test) {
71+
pub const HEARTBEAT: Duration = if cfg!(any(test, feature = "testsuite")) {
7272
// Right now some unit test end when we detect that a
7373
// pipeline has terminated, which can require waiting
7474
// for a heartbeat.

quickwit/quickwit-actors/src/spawn_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ async fn actor_loop<A: Actor>(actor: A, inbox: Inbox<A>, ctx: ActorContext<A>) -
290290
};
291291

292292
let final_exit_status = actor_env.finalize(after_process_exit_status).await;
293+
// The last observation is collected on `ActorExecutionEnv::Drop`.
293294
actor_env.process_exit_status(&final_exit_status);
294295
final_exit_status
295296
}

quickwit/quickwit-cli/src/index.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,8 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
917917
quickwit_storage_uri_resolver().clone(),
918918
)
919919
.await?;
920-
let (indexing_server_mailbox, _) = universe.spawn_builder().spawn(indexing_server);
920+
let (indexing_server_mailbox, indexing_server_handle) =
921+
universe.spawn_builder().spawn(indexing_server);
921922
let pipeline_id = indexing_server_mailbox
922923
.ask_for_res(SpawnPipeline {
923924
index_id: args.index_id.clone(),
@@ -942,6 +943,11 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
942943
}
943944
let statistics =
944945
start_statistics_reporting_loop(pipeline_handle, args.input_path_opt.is_none()).await?;
946+
// Shutdown the indexing server.
947+
universe
948+
.send_exit_with_success(&indexing_server_mailbox)
949+
.await?;
950+
indexing_server_handle.join().await;
945951
if statistics.num_published_splits > 0 {
946952
println!(
947953
"Now, you can query the index with the following command:\nquickwit index search \

quickwit/quickwit-common/src/net.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,6 @@ impl Display for HostAddr {
210210
/// Finds a random available TCP port.
211211
///
212212
/// This function induces a race condition, use it only in unit tests.
213-
#[cfg(any(test, feature = "testsuite"))]
214213
pub fn find_available_tcp_port() -> anyhow::Result<u16> {
215214
let socket: SocketAddr = ([127, 0, 0, 1], 0u16).into();
216215
let listener = TcpListener::bind(socket)?;

quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,35 @@ impl Actor for IndexingPipeline {
117117
self.handle(Supervise, ctx).await?;
118118
Ok(())
119119
}
120+
121+
async fn finalize(
122+
&mut self,
123+
_exit_status: &ActorExitStatus,
124+
_ctx: &ActorContext<Self>,
125+
) -> anyhow::Result<()> {
126+
// We update the observation to ensure our last "black box" observation
127+
// is up to date.
128+
if let Some(handles) = &self.handles {
129+
let (doc_processor_counters, indexer_counters, uploader_counters, publisher_counters) = join!(
130+
handles.doc_processor.observe(),
131+
handles.indexer.observe(),
132+
handles.uploader.observe(),
133+
handles.publisher.observe(),
134+
);
135+
self.statistics = self
136+
.previous_generations_statistics
137+
.clone()
138+
.add_actor_counters(
139+
&*doc_processor_counters,
140+
&*indexer_counters,
141+
&*uploader_counters,
142+
&*publisher_counters,
143+
)
144+
.set_generation(self.statistics.generation)
145+
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
146+
}
147+
Ok(())
148+
}
120149
}
121150

122151
impl IndexingPipeline {
@@ -475,13 +504,15 @@ mod tests {
475504
use std::path::PathBuf;
476505
use std::sync::Arc;
477506

478-
use quickwit_actors::Universe;
479-
use quickwit_config::{IndexingSettings, SourceParams};
507+
use quickwit_actors::{Command, Universe};
508+
use quickwit_config::{IndexingSettings, SourceParams, VoidSourceParams};
480509
use quickwit_doc_mapper::default_doc_mapper_for_test;
481510
use quickwit_metastore::{IndexMetadata, MetastoreError, MockMetastore};
482511
use quickwit_storage::RamStorage;
483512

484513
use super::{IndexingPipeline, *};
514+
use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
515+
use crate::merge_policy::default_merge_policy;
485516
use crate::models::IndexingDirectory;
486517

487518
#[test]
@@ -674,4 +705,82 @@ mod tests {
674705
assert_eq!(pipeline_statistics.num_published_splits, 1);
675706
Ok(())
676707
}
708+
709+
#[tokio::test]
710+
async fn test_merge_pipeline_does_not_stop_on_indexing_pipeline_failure() -> anyhow::Result<()>
711+
{
712+
let mut metastore = MockMetastore::default();
713+
metastore
714+
.expect_index_metadata()
715+
.withf(|index_id| index_id == "test-index")
716+
.returning(|_| {
717+
Ok(IndexMetadata::for_test(
718+
"test-index",
719+
"ram:///indexes/test-index",
720+
))
721+
});
722+
metastore
723+
.expect_list_splits()
724+
.returning(|_, _, _, _| Ok(Vec::new()));
725+
let universe = Universe::new();
726+
let node_id = "test-node";
727+
let metastore = Arc::new(metastore);
728+
let doc_mapper = Arc::new(default_doc_mapper_for_test());
729+
let pipeline_id = IndexingPipelineId {
730+
index_id: "test-index".to_string(),
731+
source_id: "test-source".to_string(),
732+
node_id: node_id.to_string(),
733+
pipeline_ord: 0,
734+
};
735+
let source_config = SourceConfig {
736+
source_id: "test-source".to_string(),
737+
num_pipelines: 1,
738+
enabled: true,
739+
source_params: SourceParams::Void(VoidSourceParams),
740+
};
741+
let storage = Arc::new(RamStorage::default());
742+
let split_store = IndexingSplitStore::create_without_local_store(storage.clone());
743+
let merge_pipeline_params = MergePipelineParams {
744+
pipeline_id: pipeline_id.clone(),
745+
doc_mapper: doc_mapper.clone(),
746+
indexing_directory: IndexingDirectory::for_test().await,
747+
metastore: metastore.clone(),
748+
split_store: split_store.clone(),
749+
merge_policy: default_merge_policy(),
750+
max_concurrent_split_uploads: 2,
751+
merge_max_io_num_bytes_per_sec: None,
752+
};
753+
let merge_pipeline = MergePipeline::new(merge_pipeline_params);
754+
let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone();
755+
let (_merge_pipeline_mailbox, merge_pipeline_handler) =
756+
universe.spawn_builder().spawn(merge_pipeline);
757+
let indexing_pipeline_params = IndexingPipelineParams {
758+
pipeline_id,
759+
doc_mapper,
760+
source_config,
761+
indexing_directory: IndexingDirectory::for_test().await,
762+
indexing_settings: IndexingSettings::for_test(),
763+
metastore: metastore.clone(),
764+
queues_dir_path: PathBuf::from("./queues"),
765+
storage,
766+
split_store,
767+
max_concurrent_split_uploads_index: 4,
768+
max_concurrent_split_uploads_merge: 5,
769+
merge_planner_mailbox: merge_planner_mailbox.clone(),
770+
};
771+
let indexing_pipeline = IndexingPipeline::new(indexing_pipeline_params);
772+
let (_indexing_pipeline_mailbox, indexing_pipeline_handler) =
773+
universe.spawn_builder().spawn(indexing_pipeline);
774+
assert_eq!(indexing_pipeline_handler.observe().await.generation, 1);
775+
// Let's shutdown the indexer, this will trigger the the indexing pipeline failure and the
776+
// restart.
777+
let indexer = universe.get::<Indexer>().into_iter().next().unwrap();
778+
indexer.send_message(Command::Quit).await.unwrap();
779+
tokio::time::sleep(Duration::from_secs(2)).await;
780+
// Check indexing pipeline has restarted.
781+
assert_eq!(indexing_pipeline_handler.observe().await.generation, 2);
782+
// Check that the merge pipeline is still up.
783+
assert_eq!(merge_pipeline_handler.health(), Health::Healthy);
784+
Ok(())
785+
}
677786
}

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,13 @@ pub struct IndexingServiceState {
9090
pub num_running_pipelines: usize,
9191
pub num_successful_pipelines: usize,
9292
pub num_failed_pipelines: usize,
93+
pub num_running_merge_pipelines: usize,
9394
}
9495

9596
type IndexId = String;
9697
type SourceId = String;
9798

98-
#[derive(Hash, Eq, PartialEq)]
99+
#[derive(Clone, Hash, Eq, PartialEq)]
99100
struct MergePipelineId {
100101
index_id: String,
101102
source_id: String,
@@ -385,19 +386,40 @@ impl IndexingService {
385386
}
386387
},
387388
);
388-
// Evict merge pipelines that are not needed or failing.
389+
// Evict and kill merge pipelines that are not needed.
389390
let needed_merge_pipeline_ids: HashSet<MergePipelineId> = self
390391
.indexing_pipeline_handles
391392
.keys()
392393
.map(MergePipelineId::from)
393394
.collect();
395+
let current_merge_pipeline_ids: HashSet<MergePipelineId> =
396+
self.merge_pipeline_handles.keys().cloned().collect();
397+
for merge_pipeline_id_to_shut_down in
398+
current_merge_pipeline_ids.difference(&needed_merge_pipeline_ids)
399+
{
400+
if let Some((_, merge_pipeline_handle)) = self
401+
.merge_pipeline_handles
402+
.remove_entry(merge_pipeline_id_to_shut_down)
403+
{
404+
// We kill the merge pipeline to avoid waiting a merge operation to finish as it can
405+
// be long.
406+
info!(
407+
index_id=%merge_pipeline_id_to_shut_down.index_id,
408+
source_id=%merge_pipeline_id_to_shut_down.source_id,
409+
"No more indexing pipeline on this index and source, killing merge pipeline."
410+
);
411+
merge_pipeline_handle.handle.kill().await;
412+
}
413+
}
414+
// Finally remove the merge pipelien with an exit status.
394415
self.merge_pipeline_handles
395-
.retain(|merge_pipeline_id, merge_pipeline_mailbox_handle| {
416+
.retain(|_, merge_pipeline_mailbox_handle| {
396417
match merge_pipeline_mailbox_handle.handle.health() {
397-
Health::Healthy => needed_merge_pipeline_ids.contains(merge_pipeline_id),
418+
Health::Healthy => true,
398419
Health::FailureOrUnhealthy | Health::Success => false,
399420
}
400421
});
422+
self.state.num_running_merge_pipelines = self.merge_pipeline_handles.len();
401423
Ok(())
402424
}
403425

@@ -617,7 +639,7 @@ impl Handler<ShutdownPipeline> for IndexingService {
617639
mod tests {
618640
use std::time::Duration;
619641

620-
use quickwit_actors::{ObservationType, Universe};
642+
use quickwit_actors::{ObservationType, Universe, HEARTBEAT};
621643
use quickwit_common::rand::append_random_suffix;
622644
use quickwit_common::uri::Uri;
623645
use quickwit_config::{SourceConfig, VecSourceParams};
@@ -853,4 +875,84 @@ mod tests {
853875
}
854876
panic!("Sleep");
855877
}
878+
879+
#[tokio::test]
880+
async fn test_indexing_service_shut_down_merge_pipeline_when_no_indexing_pipeline() {
881+
quickwit_common::setup_logging_for_tests();
882+
let metastore_uri = Uri::from_well_formed("ram:///metastore".to_string());
883+
let metastore = quickwit_metastore_uri_resolver()
884+
.resolve(&metastore_uri)
885+
.await
886+
.unwrap();
887+
888+
let index_id = append_random_suffix("test-indexing-service");
889+
let index_uri = format!("ram:///indexes/{index_id}");
890+
let index_metadata = IndexMetadata::for_test(&index_id, &index_uri);
891+
892+
let source_config = SourceConfig {
893+
source_id: "test-indexing-service--source".to_string(),
894+
num_pipelines: 1,
895+
enabled: true,
896+
source_params: SourceParams::void(),
897+
};
898+
metastore.create_index(index_metadata).await.unwrap();
899+
metastore
900+
.add_source(&index_id, source_config.clone())
901+
.await
902+
.unwrap();
903+
904+
// Test `IndexingService::new`.
905+
let temp_dir = tempfile::tempdir().unwrap();
906+
let data_dir_path = temp_dir.path().to_path_buf();
907+
let indexer_config = IndexerConfig::for_test().unwrap();
908+
let storage_resolver = StorageUriResolver::for_test();
909+
let universe = Universe::new();
910+
let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME);
911+
init_ingest_api(&universe, &queues_dir_path).await.unwrap();
912+
let indexing_server = IndexingService::new(
913+
"test-node".to_string(),
914+
data_dir_path,
915+
indexer_config,
916+
metastore.clone(),
917+
storage_resolver.clone(),
918+
)
919+
.await
920+
.unwrap();
921+
let (indexing_server_mailbox, indexing_server_handle) =
922+
universe.spawn_builder().spawn(indexing_server);
923+
indexing_server_mailbox
924+
.ask_for_res(SpawnPipelines {
925+
index_id: index_id.clone(),
926+
})
927+
.await
928+
.unwrap();
929+
let observation = indexing_server_handle.observe().await;
930+
assert_eq!(observation.num_running_pipelines, 1);
931+
assert_eq!(observation.num_failed_pipelines, 0);
932+
assert_eq!(observation.num_successful_pipelines, 0);
933+
934+
// Test `shutdown_pipeline`
935+
indexing_server_mailbox
936+
.ask_for_res(ShutdownPipelines {
937+
index_id: index_id.clone(),
938+
source_id: None,
939+
})
940+
.await
941+
.unwrap();
942+
assert_eq!(
943+
indexing_server_handle.observe().await.num_running_pipelines,
944+
0
945+
);
946+
assert_eq!(
947+
indexing_server_handle
948+
.observe()
949+
.await
950+
.num_running_merge_pipelines,
951+
0
952+
);
953+
universe.simulate_time_shift(HEARTBEAT).await;
954+
// Check that the merge pipeline is also shut down as they are no more indexing pipeilne on
955+
// the index.
956+
assert!(universe.get_one::<MergePipeline>().is_none());
957+
}
856958
}

0 commit comments

Comments
 (0)