Skip to content

Commit dff2fb4

Browse files
authored
Merge branch 'main' into static_schema
2 parents a3fb21d + b651864 commit dff2fb4

File tree

14 files changed

+185
-202
lines changed

14 files changed

+185
-202
lines changed

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ impl ParseableSinkProcessor {
6868

6969
let (rb, is_first) = batch_json_event.into_recordbatch(
7070
&schema,
71+
Utc::now(),
7172
static_schema_flag,
7273
time_partition.as_ref(),
7374
schema_version,

src/event/format/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::{
2626
use anyhow::{anyhow, Error as AnyError};
2727
use arrow_array::RecordBatch;
2828
use arrow_schema::{DataType, Field, Schema, TimeUnit};
29-
use chrono::DateTime;
29+
use chrono::{DateTime, Utc};
3030
use serde::{Deserialize, Serialize};
3131
use serde_json::Value;
3232

@@ -108,6 +108,7 @@ pub trait EventFormat: Sized {
108108
fn into_recordbatch(
109109
self,
110110
storage_schema: &HashMap<String, Arc<Field>>,
111+
p_timestamp: DateTime<Utc>,
111112
static_schema_flag: bool,
112113
time_partition: Option<&String>,
113114
schema_version: SchemaVersion,
@@ -145,7 +146,7 @@ pub trait EventFormat: Sized {
145146
rb.schema(),
146147
&rb,
147148
&[0],
148-
&[Arc::new(get_timestamp_array(rb.num_rows()))],
149+
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
149150
);
150151

151152
Ok((rb, is_first))

src/handlers/http/health_check.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use actix_web::{
2727
HttpResponse,
2828
};
2929
use http::StatusCode;
30-
use tokio::sync::Mutex;
30+
use tokio::{sync::Mutex, task::JoinSet};
31+
use tracing::{error, info, warn};
3132

3233
use crate::parseable::PARSEABLE;
3334

@@ -60,8 +61,29 @@ pub async fn shutdown() {
6061
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
6162
*shutdown_flag = true;
6263

64+
let mut joinset = JoinSet::new();
65+
6366
// Sync staging
64-
PARSEABLE.flush_all_streams();
67+
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
68+
69+
while let Some(res) = joinset.join_next().await {
70+
match res {
71+
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
72+
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
73+
Err(err) => error!("Failed to join async task: {err}"),
74+
}
75+
}
76+
77+
if let Err(e) = PARSEABLE
78+
.storage
79+
.get_object_store()
80+
.upload_files_from_staging()
81+
.await
82+
{
83+
warn!("Failed to sync local data with object store. {:?}", e);
84+
} else {
85+
info!("Successfully synced all data to S3.");
86+
}
6587
}
6688

6789
pub async fn readiness() -> HttpResponse {

src/handlers/http/ingest.rs

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7979

8080
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
8181
let size: usize = body.len();
82-
let parsed_timestamp = Utc::now().naive_utc();
82+
let now = Utc::now();
8383
let (rb, is_first) = {
8484
let body_val: Value = serde_json::from_slice(&body)?;
8585
let hash_map = PARSEABLE.streams.read().unwrap();
@@ -93,15 +93,15 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9393
.clone();
9494
let event = format::json::Event { data: body_val };
9595
// For internal streams, use old schema
96-
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
96+
event.into_recordbatch(&schema, now, false, None, SchemaVersion::V0)?
9797
};
9898
event::Event {
9999
rb,
100100
stream_name,
101101
origin_format: "json",
102102
origin_size: size as u64,
103103
is_first_event: is_first,
104-
parsed_timestamp,
104+
parsed_timestamp: now.naive_utc(),
105105
time_partition: None,
106106
custom_partition_values: HashMap::new(),
107107
stream_type: StreamType::Internal,
@@ -351,6 +351,7 @@ mod tests {
351351
use arrow::datatypes::Int64Type;
352352
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
353353
use arrow_schema::{DataType, Field};
354+
use chrono::Utc;
354355
use serde_json::json;
355356
use std::{collections::HashMap, sync::Arc};
356357

@@ -392,8 +393,15 @@ mod tests {
392393
"b": "hello",
393394
});
394395

395-
let (rb, _) =
396-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
396+
let (rb, _) = into_event_batch(
397+
json,
398+
HashMap::default(),
399+
Utc::now(),
400+
false,
401+
None,
402+
SchemaVersion::V0,
403+
)
404+
.unwrap();
397405

398406
assert_eq!(rb.num_rows(), 1);
399407
assert_eq!(rb.num_columns(), 4);
@@ -419,8 +427,15 @@ mod tests {
419427
"c": null
420428
});
421429

422-
let (rb, _) =
423-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
430+
let (rb, _) = into_event_batch(
431+
json,
432+
HashMap::default(),
433+
Utc::now(),
434+
false,
435+
None,
436+
SchemaVersion::V0,
437+
)
438+
.unwrap();
424439

425440
assert_eq!(rb.num_rows(), 1);
426441
assert_eq!(rb.num_columns(), 3);
@@ -450,7 +465,8 @@ mod tests {
450465
.into_iter(),
451466
);
452467

453-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
468+
let (rb, _) =
469+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
454470

455471
assert_eq!(rb.num_rows(), 1);
456472
assert_eq!(rb.num_columns(), 3);
@@ -480,7 +496,9 @@ mod tests {
480496
.into_iter(),
481497
);
482498

483-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
499+
assert!(
500+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
501+
);
484502
}
485503

486504
#[test]
@@ -496,7 +514,8 @@ mod tests {
496514
.into_iter(),
497515
);
498516

499-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
517+
let (rb, _) =
518+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
500519

501520
assert_eq!(rb.num_rows(), 1);
502521
assert_eq!(rb.num_columns(), 1);
@@ -535,8 +554,15 @@ mod tests {
535554
},
536555
]);
537556

538-
let (rb, _) =
539-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
557+
let (rb, _) = into_event_batch(
558+
json,
559+
HashMap::default(),
560+
Utc::now(),
561+
false,
562+
None,
563+
SchemaVersion::V0,
564+
)
565+
.unwrap();
540566

541567
assert_eq!(rb.num_rows(), 3);
542568
assert_eq!(rb.num_columns(), 4);
@@ -582,8 +608,15 @@ mod tests {
582608
},
583609
]);
584610

585-
let (rb, _) =
586-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
611+
let (rb, _) = into_event_batch(
612+
json,
613+
HashMap::default(),
614+
Utc::now(),
615+
false,
616+
None,
617+
SchemaVersion::V0,
618+
)
619+
.unwrap();
587620

588621
assert_eq!(rb.num_rows(), 3);
589622
assert_eq!(rb.num_columns(), 4);
@@ -630,7 +663,8 @@ mod tests {
630663
.into_iter(),
631664
);
632665

633-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
666+
let (rb, _) =
667+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
634668

635669
assert_eq!(rb.num_rows(), 3);
636670
assert_eq!(rb.num_columns(), 4);
@@ -677,7 +711,9 @@ mod tests {
677711
.into_iter(),
678712
);
679713

680-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
714+
assert!(
715+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
716+
);
681717
}
682718

683719
#[test]
@@ -718,6 +754,7 @@ mod tests {
718754
let (rb, _) = into_event_batch(
719755
flattened_json,
720756
HashMap::default(),
757+
Utc::now(),
721758
false,
722759
None,
723760
SchemaVersion::V0,
@@ -806,6 +843,7 @@ mod tests {
806843
let (rb, _) = into_event_batch(
807844
flattened_json,
808845
HashMap::default(),
846+
Utc::now(),
809847
false,
810848
None,
811849
SchemaVersion::V1,

src/handlers/http/modal/mod.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,26 +136,6 @@ pub trait ParseableServer {
136136

137137
health_check::shutdown().await;
138138

139-
// Perform S3 sync and wait for completion
140-
info!("Starting data sync to S3...");
141-
142-
if let Err(e) = PARSEABLE.streams.prepare_parquet(true) {
143-
warn!("Failed to convert arrow files to parquet. {:?}", e);
144-
} else {
145-
info!("Successfully converted arrow files to parquet.");
146-
}
147-
148-
if let Err(e) = PARSEABLE
149-
.storage
150-
.get_object_store()
151-
.upload_files_from_staging()
152-
.await
153-
{
154-
warn!("Failed to sync local data with object store. {:?}", e);
155-
} else {
156-
info!("Successfully synced all data to S3.");
157-
}
158-
159139
// Initiate graceful shutdown
160140
info!("Graceful shutdown of HTTP server triggered");
161141
srv_handle.stop(true).await;

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ async fn push_logs(
9696
let static_schema_flag = stream.get_static_schema_flag();
9797
let custom_partition = stream.get_custom_partition();
9898
let schema_version = stream.get_schema_version();
99+
let p_timestamp = Utc::now();
99100

100101
let data = if time_partition.is_some() || custom_partition.is_some() {
101102
convert_array_to_object(
@@ -121,7 +122,7 @@ async fn push_logs(
121122
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
122123
let parsed_timestamp = match time_partition.as_ref() {
123124
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
124-
_ => Utc::now().naive_utc(),
125+
_ => p_timestamp.naive_utc(),
125126
};
126127
let custom_partition_values = match custom_partition.as_ref() {
127128
Some(custom_partition) => {
@@ -144,6 +145,7 @@ async fn push_logs(
144145
let (rb, is_first_event) = into_event_batch(
145146
value,
146147
schema,
148+
p_timestamp,
147149
static_schema_flag,
148150
time_partition.as_ref(),
149151
schema_version,
@@ -168,12 +170,14 @@ async fn push_logs(
168170
pub fn into_event_batch(
169171
data: Value,
170172
schema: HashMap<String, Arc<Field>>,
173+
p_timestamp: DateTime<Utc>,
171174
static_schema_flag: bool,
172175
time_partition: Option<&String>,
173176
schema_version: SchemaVersion,
174177
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
175178
let (rb, is_first) = json::Event { data }.into_recordbatch(
176179
&schema,
180+
p_timestamp,
177181
static_schema_flag,
178182
time_partition,
179183
schema_version,

src/lib.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,15 @@ use reqwest::{Client, ClientBuilder};
5959
// It is very unlikely that panic will occur when dealing with locks.
6060
pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock";
6161

62-
pub const STORAGE_CONVERSION_INTERVAL: u64 = 60;
63-
pub const STORAGE_UPLOAD_INTERVAL: u64 = 30;
62+
/// Describes the duration at the end of which in-memory buffers are flushed,
63+
/// arrows files are "finished" and compacted into parquet files.
64+
pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60);
65+
66+
/// Duration used to configure prefix generation.
67+
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60;
68+
69+
/// Describes the duration at the end of which parquets are pushed into objectstore.
70+
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);
6471

6572
// A single HTTP client for all outgoing HTTP requests from the parseable server
6673
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {

0 commit comments

Comments
 (0)