Skip to content

Commit 2b3b2d4

Browse files
author
Devdutt Shenoi
committed
feat: parallelize flush+conversion per stream
1 parent 8219e43 commit 2b3b2d4

File tree

5 files changed

+47
-57
lines changed

5 files changed

+47
-57
lines changed

src/handlers/http/health_check.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use actix_web::{
2727
HttpResponse,
2828
};
2929
use http::StatusCode;
30-
use tokio::sync::Mutex;
30+
use tokio::{sync::Mutex, task::JoinSet};
31+
use tracing::{error, info, warn};
3132

3233
use crate::parseable::PARSEABLE;
3334

@@ -60,8 +61,29 @@ pub async fn shutdown() {
6061
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
6162
*shutdown_flag = true;
6263

64+
let mut joinset = JoinSet::new();
65+
6366
// Sync staging
64-
PARSEABLE.flush_all_streams();
67+
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
68+
69+
while let Some(res) = joinset.join_next().await {
70+
match res {
71+
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
72+
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
73+
Err(err) => error!("Failed to join async task: {err}"),
74+
}
75+
}
76+
77+
if let Err(e) = PARSEABLE
78+
.storage
79+
.get_object_store()
80+
.upload_files_from_staging()
81+
.await
82+
{
83+
warn!("Failed to sync local data with object store. {:?}", e);
84+
} else {
85+
info!("Successfully synced all data to S3.");
86+
}
6587
}
6688

6789
pub async fn readiness() -> HttpResponse {

src/handlers/http/modal/mod.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,26 +136,6 @@ pub trait ParseableServer {
136136

137137
health_check::shutdown().await;
138138

139-
// Perform S3 sync and wait for completion
140-
info!("Starting data sync to S3...");
141-
142-
if let Err(e) = PARSEABLE.streams.prepare_parquet(true) {
143-
warn!("Failed to convert arrow files to parquet. {:?}", e);
144-
} else {
145-
info!("Successfully converted arrow files to parquet.");
146-
}
147-
148-
if let Err(e) = PARSEABLE
149-
.storage
150-
.get_object_store()
151-
.upload_files_from_staging()
152-
.await
153-
{
154-
warn!("Failed to sync local data with object store. {:?}", e);
155-
} else {
156-
info!("Successfully synced all data to S3.");
157-
}
158-
159139
// Initiate graceful shutdown
160140
info!("Graceful shutdown of HTTP server triggered");
161141
srv_handle.stop(true).await;

src/parseable/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -179,16 +179,6 @@ impl Parseable {
179179
.unwrap_or_default())
180180
}
181181

182-
/// Writes all streams in staging onto disk, awaiting conversion into parquet.
183-
/// Deletes all in memory recordbatches, freeing up rows in mem-writer.
184-
pub fn flush_all_streams(&self) {
185-
let streams = self.streams.read().unwrap();
186-
187-
for staging in streams.values() {
188-
staging.flush()
189-
}
190-
}
191-
192182
// validate the storage, if the proper path for staging directory is provided
193183
// if the proper data directory is provided, or s3 bucket is provided etc
194184
pub async fn validate_storage(&self) -> Result<Option<Bytes>, ObjectStorageError> {

src/parseable/streams.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use parquet::{
4141
};
4242
use rand::distributions::DistString;
4343
use relative_path::RelativePathBuf;
44+
use tokio::task::JoinSet;
4445
use tracing::{error, info, trace, warn};
4546

4647
use crate::{
@@ -651,6 +652,13 @@ impl Stream {
651652
pub fn get_stream_type(&self) -> StreamType {
652653
self.metadata.read().expect(LOCK_EXPECT).stream_type
653654
}
655+
656+
/// First flushes arrows onto disk and then converts the arrow into parquet
657+
pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> {
658+
self.flush();
659+
660+
self.prepare_parquet(shutdown_signal)
661+
}
654662
}
655663

656664
#[derive(Deref, DerefMut, Default)]
@@ -717,21 +725,22 @@ impl Streams {
717725
.collect()
718726
}
719727

720-
/// Convert arrow files into parquet, preparing it for upload
721-
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
728+
/// Asynchronously flushes arrows and compacts into parquet data on all streams in staging,
729+
/// so that it is ready to be pushed onto objectstore.
730+
pub fn flush_and_convert(
731+
&self,
732+
joinset: &mut JoinSet<Result<(), StagingError>>,
733+
shutdown_signal: bool,
734+
) {
722735
let streams: Vec<Arc<Stream>> = self
723736
.read()
724737
.expect(LOCK_EXPECT)
725738
.values()
726739
.map(Arc::clone)
727740
.collect();
728741
for stream in streams {
729-
stream
730-
.prepare_parquet(shutdown_signal)
731-
.inspect_err(|err| error!("Failed to run conversion task {err:?}"))?;
742+
joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) });
732743
}
733-
734-
Ok(())
735744
}
736745
}
737746

src/sync.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,15 @@ pub fn local_sync() -> (
190190
select! {
191191
// Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds
192192
_ = sync_interval.tick() => {
193-
joinset.spawn(flush_and_convert());
193+
PARSEABLE.streams.flush_and_convert(&mut joinset, false)
194194
},
195195
// Joins and logs errors in spawned tasks
196-
Some(Err(e)) = joinset.join_next(), if !joinset.is_empty() => {
197-
error!("Issue joining flush+conversion: {e}")
196+
Some(res) = joinset.join_next(), if !joinset.is_empty() => {
197+
match res {
198+
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
199+
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
200+
Err(err) => error!("Issue joining flush+conversion task: {err}"),
201+
}
198202
}
199203
res = &mut inbox_rx => {match res{
200204
Ok(_) => break,
@@ -269,18 +273,3 @@ pub fn schedule_alert_task(
269273
});
270274
Ok(handle)
271275
}
272-
273-
/// Asynchronously flushes all streams when called, then compacts them into parquet files ready to be pushed onto objectstore
274-
async fn flush_and_convert() {
275-
trace!("Flushing Arrows to disk...");
276-
PARSEABLE.flush_all_streams();
277-
278-
trace!("Converting Arrow to Parquet... ");
279-
if let Err(e) = monitor_task_duration("arrow_conversion", Duration::from_secs(30), || async {
280-
PARSEABLE.streams.prepare_parquet(false)
281-
})
282-
.await
283-
{
284-
warn!("failed to convert local arrow data to parquet. {e:?}");
285-
}
286-
}

0 commit comments

Comments
 (0)