Skip to content

Commit e7d8018

Browse files
author
Devdutt Shenoi
authored
Merge branch 'main' into time-created
2 parents 764cba3 + 649c000 commit e7d8018

File tree

9 files changed

+101
-158
lines changed

9 files changed

+101
-158
lines changed

src/handlers/http/health_check.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use actix_web::{
2727
HttpResponse,
2828
};
2929
use http::StatusCode;
30-
use tokio::sync::Mutex;
30+
use tokio::{sync::Mutex, task::JoinSet};
31+
use tracing::{error, info, warn};
3132

3233
use crate::parseable::PARSEABLE;
3334

@@ -60,8 +61,29 @@ pub async fn shutdown() {
6061
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
6162
*shutdown_flag = true;
6263

64+
let mut joinset = JoinSet::new();
65+
6366
// Sync staging
64-
PARSEABLE.flush_all_streams();
67+
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
68+
69+
while let Some(res) = joinset.join_next().await {
70+
match res {
71+
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
72+
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
73+
Err(err) => error!("Failed to join async task: {err}"),
74+
}
75+
}
76+
77+
if let Err(e) = PARSEABLE
78+
.storage
79+
.get_object_store()
80+
.upload_files_from_staging()
81+
.await
82+
{
83+
warn!("Failed to sync local data with object store. {:?}", e);
84+
} else {
85+
info!("Successfully synced all data to S3.");
86+
}
6587
}
6688

6789
pub async fn readiness() -> HttpResponse {

src/handlers/http/modal/mod.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,26 +136,6 @@ pub trait ParseableServer {
136136

137137
health_check::shutdown().await;
138138

139-
// Perform S3 sync and wait for completion
140-
info!("Starting data sync to S3...");
141-
142-
if let Err(e) = PARSEABLE.streams.prepare_parquet(true) {
143-
warn!("Failed to convert arrow files to parquet. {:?}", e);
144-
} else {
145-
info!("Successfully converted arrow files to parquet.");
146-
}
147-
148-
if let Err(e) = PARSEABLE
149-
.storage
150-
.get_object_store()
151-
.upload_files_from_staging()
152-
.await
153-
{
154-
warn!("Failed to sync local data with object store. {:?}", e);
155-
} else {
156-
info!("Successfully synced all data to S3.");
157-
}
158-
159139
// Initiate graceful shutdown
160140
info!("Graceful shutdown of HTTP server triggered");
161141
srv_handle.stop(true).await;

src/lib.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,15 @@ use reqwest::{Client, ClientBuilder};
5959
// It is very unlikely that panic will occur when dealing with locks.
6060
pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock";
6161

62-
pub const STORAGE_CONVERSION_INTERVAL: u64 = 60;
63-
pub const STORAGE_UPLOAD_INTERVAL: u64 = 30;
62+
/// Describes the duration at the end of which in-memory buffers are flushed,
63+
/// arrows files are "finished" and compacted into parquet files.
64+
pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60);
65+
66+
/// Duration used to configure prefix generation.
67+
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60;
68+
69+
/// Describes the duration at the end of which parquets are pushed into objectstore.
70+
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);
6471

6572
// A single HTTP client for all outgoing HTTP requests from the parseable server
6673
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {

src/parseable/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -179,16 +179,6 @@ impl Parseable {
179179
.unwrap_or_default())
180180
}
181181

182-
/// Writes all streams in staging onto disk, awaiting conversion into parquet.
183-
/// Deletes all in memory recordbatches, freeing up rows in mem-writer.
184-
pub fn flush_all_streams(&self) {
185-
let streams = self.streams.read().unwrap();
186-
187-
for staging in streams.values() {
188-
staging.flush()
189-
}
190-
}
191-
192182
// validate the storage, if the proper path for staging directory is provided
193183
// if the proper data directory is provided, or s3 bucket is provided etc
194184
pub async fn validate_storage(&self) -> Result<Option<Bytes>, ObjectStorageError> {

src/parseable/streams.rs

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use parquet::{
4242
};
4343
use rand::distributions::DistString;
4444
use relative_path::RelativePathBuf;
45+
use tokio::task::JoinSet;
4546
use tracing::{error, info, trace, warn};
4647

4748
use crate::{
@@ -50,11 +51,9 @@ use crate::{
5051
metadata::{LogStreamMetadata, SchemaVersion},
5152
metrics,
5253
option::Mode,
53-
storage::{
54-
object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY,
55-
},
54+
storage::{object_storage::to_bytes, retention::Retention, StreamType},
5655
utils::minute_to_slot,
57-
LOCK_EXPECT,
56+
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
5857
};
5958

6059
use super::{
@@ -446,21 +445,27 @@ impl Stream {
446445
.set(0);
447446
}
448447

449-
// warn!("staging files-\n{staging_files:?}\n");
450-
for (parquet_path, arrow_files) in staging_files {
451-
metrics::STAGING_FILES
452-
.with_label_values(&[&self.stream_name])
453-
.set(arrow_files.len() as i64);
454-
455-
for file in &arrow_files {
456-
let file_size = file.metadata().unwrap().len();
457-
let file_type = file.extension().unwrap().to_str().unwrap();
448+
//find sum of arrow files in staging directory for a stream
449+
let total_arrow_files = staging_files.values().map(|v| v.len()).sum::<usize>();
450+
metrics::STAGING_FILES
451+
.with_label_values(&[&self.stream_name])
452+
.set(total_arrow_files as i64);
458453

459-
metrics::STORAGE_SIZE
460-
.with_label_values(&["staging", &self.stream_name, file_type])
461-
.add(file_size as i64);
462-
}
454+
//find sum of file sizes of all arrow files in staging_files
455+
let total_arrow_files_size = staging_files
456+
.values()
457+
.map(|v| {
458+
v.iter()
459+
.map(|file| file.metadata().unwrap().len())
460+
.sum::<u64>()
461+
})
462+
.sum::<u64>();
463+
metrics::STORAGE_SIZE
464+
.with_label_values(&["staging", &self.stream_name, "arrows"])
465+
.set(total_arrow_files_size as i64);
463466

467+
// warn!("staging files-\n{staging_files:?}\n");
468+
for (parquet_path, arrow_files) in staging_files {
464469
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
465470
if record_reader.readers.is_empty() {
466471
continue;
@@ -496,6 +501,7 @@ impl Stream {
496501
"Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"
497502
);
498503
}
504+
499505
for file in arrow_files {
500506
// warn!("file-\n{file:?}\n");
501507
let file_size = file.metadata().unwrap().len();
@@ -655,6 +661,13 @@ impl Stream {
655661
pub fn get_stream_type(&self) -> StreamType {
656662
self.metadata.read().expect(LOCK_EXPECT).stream_type
657663
}
664+
665+
/// First flushes arrows onto disk and then converts the arrow into parquet
666+
pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> {
667+
self.flush();
668+
669+
self.prepare_parquet(shutdown_signal)
670+
}
658671
}
659672

660673
#[derive(Deref, DerefMut, Default)]
@@ -721,21 +734,22 @@ impl Streams {
721734
.collect()
722735
}
723736

724-
/// Convert arrow files into parquet, preparing it for upload
725-
pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> {
737+
/// Asynchronously flushes arrows and compacts into parquet data on all streams in staging,
738+
/// so that it is ready to be pushed onto objectstore.
739+
pub fn flush_and_convert(
740+
&self,
741+
joinset: &mut JoinSet<Result<(), StagingError>>,
742+
shutdown_signal: bool,
743+
) {
726744
let streams: Vec<Arc<Stream>> = self
727745
.read()
728746
.expect(LOCK_EXPECT)
729747
.values()
730748
.map(Arc::clone)
731749
.collect();
732750
for stream in streams {
733-
stream
734-
.prepare_parquet(shutdown_signal)
735-
.inspect_err(|err| error!("Failed to run conversion task {err:?}"))?;
751+
joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) });
736752
}
737-
738-
Ok(())
739753
}
740754
}
741755

src/query/listing_table_builder.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ use itertools::Itertools;
3232
use object_store::{path::Path, ObjectMeta, ObjectStore};
3333

3434
use crate::{
35-
event::DEFAULT_TIMESTAMP_KEY,
36-
storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY},
37-
utils::time::TimeRange,
35+
event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, utils::time::TimeRange,
36+
OBJECT_STORE_DATA_GRANULARITY,
3837
};
3938

4039
use super::PartialTimeFilter;

src/storage/mod.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,6 @@ pub const SCHEMA_FILE_NAME: &str = ".schema";
5757
pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts";
5858
pub const MANIFEST_FILE: &str = "manifest.json";
5959

60-
/// local sync interval to move data.records to /tmp dir of that stream.
61-
/// 60 sec is a reasonable value.
62-
pub const LOCAL_SYNC_INTERVAL: u64 = 60;
63-
64-
/// duration used to configure prefix in objectstore and local disk structure
65-
/// used for storage. Defaults to 1 min.
66-
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60;
67-
6860
// max concurrent request allowed for datafusion object store
6961
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
7062

src/storage/object_storage.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R
3636
use once_cell::sync::OnceCell;
3737
use relative_path::RelativePath;
3838
use relative_path::RelativePathBuf;
39-
use tracing::{debug, error, warn};
39+
use tracing::info;
40+
use tracing::{error, warn};
4041
use ulid::Ulid;
4142

4243
use crate::alerts::AlertConfig;
@@ -712,13 +713,13 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
712713
}
713714

714715
async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> {
715-
if !Path::new(&PARSEABLE.options.staging_dir()).exists() {
716+
if !PARSEABLE.options.staging_dir().exists() {
716717
return Ok(());
717718
}
718719

719720
// get all streams
720721
for stream_name in PARSEABLE.streams.list() {
721-
debug!("Starting object_store_sync for stream- {stream_name}");
722+
info!("Starting object_store_sync for stream- {stream_name}");
722723

723724
let stream = PARSEABLE.get_or_create_stream(&stream_name);
724725
let custom_partition = stream.get_custom_partition();

0 commit comments

Comments
 (0)