Skip to content

Commit cf09d87

Browse files
author
Devdutt Shenoi
committed
refactor: memory/cpu efficient handling of stream configuration variables
1 parent 523ecc7 commit cf09d87

File tree

13 files changed

+303
-282
lines changed

13 files changed

+303
-282
lines changed

src/handlers/http/ingest.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ mod tests {
512512
json,
513513
None,
514514
None,
515-
None,
515+
&[],
516516
SchemaVersion::V0,
517517
&crate::event::format::LogSource::default()
518518
)
@@ -709,7 +709,7 @@ mod tests {
709709
json,
710710
None,
711711
None,
712-
None,
712+
&[],
713713
SchemaVersion::V0,
714714
&crate::event::format::LogSource::default(),
715715
)
@@ -797,7 +797,7 @@ mod tests {
797797
json,
798798
None,
799799
None,
800-
None,
800+
&[],
801801
SchemaVersion::V1,
802802
&crate::event::format::LogSource::default(),
803803
)

src/handlers/http/logstream.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
363363
time_partition_limit: stream_meta
364364
.time_partition_limit
365365
.map(|limit| limit.to_string()),
366-
custom_partition: stream_meta.custom_partition.clone(),
366+
custom_partition: stream_meta.custom_partitions.clone(),
367367
static_schema_flag: stream_meta.static_schema_flag,
368368
log_source: stream_meta.log_source.clone(),
369369
};
@@ -485,6 +485,7 @@ pub mod error {
485485
use http::StatusCode;
486486

487487
use crate::{
488+
handlers::http::modal::utils::logstream_utils::HeaderParseError,
488489
hottier::HotTierError,
489490
parseable::StreamNotFound,
490491
storage::ObjectStorageError,
@@ -554,6 +555,8 @@ pub mod error {
554555
HotTierValidation(#[from] HotTierValidationError),
555556
#[error("{0}")]
556557
HotTierError(#[from] HotTierError),
558+
#[error("Error when parsing headers: {0}")]
559+
HeaderParsing(#[from] HeaderParseError),
557560
}
558561

559562
impl actix_web::ResponseError for StreamError {
@@ -589,6 +592,7 @@ pub mod error {
589592
StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN,
590593
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
591594
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
595+
StreamError::HeaderParsing(_) => StatusCode::BAD_REQUEST,
592596
}
593597
}
594598

@@ -626,7 +630,7 @@ mod tests {
626630
#[actix_web::test]
627631
async fn header_without_log_source() {
628632
let req = TestRequest::default().to_http_request();
629-
let PutStreamHeaders { log_source, .. } = req.headers().into();
633+
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
630634
assert_eq!(log_source, crate::event::format::LogSource::Json);
631635
}
632636

@@ -635,19 +639,19 @@ mod tests {
635639
let mut req = TestRequest::default()
636640
.insert_header(("X-P-Log-Source", "pmeta"))
637641
.to_http_request();
638-
let PutStreamHeaders { log_source, .. } = req.headers().into();
642+
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
639643
assert_eq!(log_source, crate::event::format::LogSource::Pmeta);
640644

641645
req = TestRequest::default()
642646
.insert_header(("X-P-Log-Source", "otel-logs"))
643647
.to_http_request();
644-
let PutStreamHeaders { log_source, .. } = req.headers().into();
648+
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
645649
assert_eq!(log_source, crate::event::format::LogSource::OtelLogs);
646650

647651
req = TestRequest::default()
648652
.insert_header(("X-P-Log-Source", "kinesis"))
649653
.to_http_request();
650-
let PutStreamHeaders { log_source, .. } = req.headers().into();
654+
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
651655
assert_eq!(log_source, crate::event::format::LogSource::Kinesis);
652656
}
653657

@@ -656,7 +660,7 @@ mod tests {
656660
let req = TestRequest::default()
657661
.insert_header(("X-P-Log-Source", "teststream"))
658662
.to_http_request();
659-
let PutStreamHeaders { log_source, .. } = req.headers().into();
663+
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
660664
assert_eq!(log_source, crate::event::format::LogSource::Json);
661665
}
662666
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
use arrow_schema::Field;
2020
use chrono::{DateTime, NaiveDateTime, Utc};
21-
use itertools::Itertools;
2221
use serde_json::Value;
2322
use std::{collections::HashMap, sync::Arc};
2423

@@ -72,15 +71,15 @@ pub async fn push_logs(
7271
.get_stream(stream_name)?
7372
.get_time_partition_limit();
7473
let static_schema_flag = stream.get_static_schema_flag();
75-
let custom_partition = stream.get_custom_partition();
74+
let custom_partitions = stream.get_custom_partitions();
7675
let schema_version = stream.get_schema_version();
7776

78-
let data = if time_partition.is_some() || custom_partition.is_some() {
77+
let data = if time_partition.is_some() || !custom_partitions.is_empty() {
7978
convert_array_to_object(
8079
json,
8180
time_partition.as_ref(),
8281
time_partition_limit,
83-
custom_partition.as_ref(),
82+
&custom_partitions,
8483
schema_version,
8584
log_source,
8685
)?
@@ -89,7 +88,7 @@ pub async fn push_logs(
8988
json,
9089
None,
9190
None,
92-
None,
91+
&[],
9392
schema_version,
9493
log_source,
9594
)?)?]
@@ -101,13 +100,7 @@ pub async fn push_logs(
101100
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
102101
_ => Utc::now().naive_utc(),
103102
};
104-
let custom_partition_values = match custom_partition.as_ref() {
105-
Some(custom_partition) => {
106-
let custom_partitions = custom_partition.split(',').collect_vec();
107-
get_custom_partition_values(&value, &custom_partitions)
108-
}
109-
None => HashMap::new(),
110-
};
103+
let custom_partition_values = get_custom_partition_values(&value, &custom_partitions);
111104
let schema = PARSEABLE
112105
.streams
113106
.read()
@@ -162,7 +155,7 @@ pub fn into_event_batch(
162155

163156
pub fn get_custom_partition_values(
164157
json: &Value,
165-
custom_partition_list: &[&str],
158+
custom_partition_list: &[String],
166159
) -> HashMap<String, String> {
167160
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
168161
for custom_partition_field in custom_partition_list {

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 76 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
use std::num::NonZeroU32;
20+
1921
use actix_web::http::header::HeaderMap;
2022

2123
use crate::{
@@ -27,31 +29,57 @@ use crate::{
2729
storage::StreamType,
2830
};
2931

32+
/// Name of a field that appears within a data stream
33+
pub type FieldName = String;
34+
35+
/// Name of the field used as a custom partition
36+
pub type CustomPartition = String;
37+
38+
#[derive(Debug, thiserror::Error)]
39+
pub enum HeaderParseError {
40+
#[error("Maximum 3 custom partition keys are supported")]
41+
TooManyPartitions,
42+
#[error("Missing 'd' suffix for duration value")]
43+
UnsupportedUnit,
44+
#[error("Could not convert duration to an unsigned number")]
45+
ZeroOrNegative,
46+
}
47+
3048
#[derive(Debug, Default)]
3149
pub struct PutStreamHeaders {
32-
pub time_partition: String,
33-
pub time_partition_limit: String,
34-
pub custom_partition: Option<String>,
50+
pub time_partition: Option<FieldName>,
51+
pub time_partition_limit: Option<NonZeroU32>,
52+
pub custom_partitions: Vec<String>,
3553
pub static_schema_flag: bool,
3654
pub update_stream_flag: bool,
3755
pub stream_type: StreamType,
3856
pub log_source: LogSource,
3957
}
4058

41-
impl From<&HeaderMap> for PutStreamHeaders {
42-
fn from(headers: &HeaderMap) -> Self {
43-
PutStreamHeaders {
44-
time_partition: headers
45-
.get(TIME_PARTITION_KEY)
46-
.map_or("", |v| v.to_str().unwrap())
47-
.to_string(),
48-
time_partition_limit: headers
49-
.get(TIME_PARTITION_LIMIT_KEY)
50-
.map_or("", |v| v.to_str().unwrap())
51-
.to_string(),
52-
custom_partition: headers
53-
.get(CUSTOM_PARTITION_KEY)
54-
.map(|v| v.to_str().unwrap().to_string()),
59+
impl TryFrom<&HeaderMap> for PutStreamHeaders {
60+
type Error = HeaderParseError;
61+
62+
fn try_from(headers: &HeaderMap) -> Result<Self, Self::Error> {
63+
let time_partition = headers
64+
.get(TIME_PARTITION_KEY)
65+
.map(|v| v.to_str().unwrap().to_owned());
66+
let time_partition_limit = match headers
67+
.get(TIME_PARTITION_LIMIT_KEY)
68+
.map(|v| v.to_str().unwrap())
69+
{
70+
Some(limit) => Some(parse_time_partition_limit(limit)?),
71+
None => None,
72+
};
73+
let custom_partition = headers
74+
.get(CUSTOM_PARTITION_KEY)
75+
.map(|v| v.to_str().unwrap())
76+
.unwrap_or_default();
77+
let custom_partitions = parse_custom_partition(custom_partition)?;
78+
79+
let headers = PutStreamHeaders {
80+
time_partition,
81+
time_partition_limit,
82+
custom_partitions,
5583
static_schema_flag: headers
5684
.get(STATIC_SCHEMA_FLAG)
5785
.is_some_and(|v| v.to_str().unwrap() == "true"),
@@ -65,6 +93,36 @@ impl From<&HeaderMap> for PutStreamHeaders {
6593
log_source: headers
6694
.get(LOG_SOURCE_KEY)
6795
.map_or(LogSource::default(), |v| v.to_str().unwrap().into()),
68-
}
96+
};
97+
98+
Ok(headers)
6999
}
70100
}
101+
102+
pub fn parse_custom_partition(
103+
custom_partition: &str,
104+
) -> Result<Vec<CustomPartition>, HeaderParseError> {
105+
let custom_partition_list = custom_partition
106+
.split(',')
107+
.map(String::from)
108+
.collect::<Vec<String>>();
109+
if custom_partition_list.len() > 3 {
110+
return Err(HeaderParseError::TooManyPartitions);
111+
}
112+
113+
Ok(custom_partition_list)
114+
}
115+
116+
pub fn parse_time_partition_limit(
117+
time_partition_limit: &str,
118+
) -> Result<NonZeroU32, HeaderParseError> {
119+
if !time_partition_limit.ends_with('d') {
120+
return Err(HeaderParseError::UnsupportedUnit);
121+
}
122+
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
123+
let Ok(days) = days.parse::<NonZeroU32>() else {
124+
return Err(HeaderParseError::ZeroOrNegative);
125+
};
126+
127+
Ok(days)
128+
}

src/metadata.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub struct LogStreamMetadata {
8383
pub first_event_at: Option<String>,
8484
pub time_partition: Option<String>,
8585
pub time_partition_limit: Option<NonZeroU32>,
86-
pub custom_partition: Option<String>,
86+
pub custom_partitions: Vec<String>,
8787
pub static_schema_flag: bool,
8888
pub hot_tier_enabled: bool,
8989
pub stream_type: StreamType,
@@ -94,9 +94,9 @@ impl LogStreamMetadata {
9494
#[allow(clippy::too_many_arguments)]
9595
pub fn new(
9696
created_at: String,
97-
time_partition: String,
97+
time_partition: Option<String>,
9898
time_partition_limit: Option<NonZeroU32>,
99-
custom_partition: Option<String>,
99+
custom_partitions: Vec<String>,
100100
static_schema_flag: bool,
101101
static_schema: HashMap<String, Arc<Field>>,
102102
stream_type: StreamType,
@@ -109,13 +109,9 @@ impl LogStreamMetadata {
109109
} else {
110110
created_at
111111
},
112-
time_partition: if time_partition.is_empty() {
113-
None
114-
} else {
115-
Some(time_partition)
116-
},
112+
time_partition,
117113
time_partition_limit,
118-
custom_partition,
114+
custom_partitions,
119115
static_schema_flag,
120116
schema: if static_schema.is_empty() {
121117
HashMap::new()

src/migration/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ async fn migration_stream(
272272
stats,
273273
time_partition,
274274
time_partition_limit,
275-
custom_partition,
275+
custom_partitions,
276276
static_schema_flag,
277277
hot_tier_enabled,
278278
stream_type,
@@ -307,7 +307,7 @@ async fn migration_stream(
307307
first_event_at,
308308
time_partition,
309309
time_partition_limit: time_partition_limit.and_then(|limit| limit.parse().ok()),
310-
custom_partition,
310+
custom_partitions,
311311
static_schema_flag,
312312
hot_tier_enabled,
313313
stream_type,

0 commit comments

Comments
 (0)