Skip to content

Commit 5a2bcc1

Browse files
author
Devdutt Shenoi
committed
refactor: extract byte_size during json deserialization
1 parent 4c1f6d8 commit 5a2bcc1

File tree

9 files changed

+110
-56
lines changed

9 files changed

+110
-56
lines changed

src/connectors/kafka/processor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition
3636
pub struct ParseableSinkProcessor;
3737

3838
impl ParseableSinkProcessor {
39-
async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<u64> {
39+
async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<usize> {
4040
let stream_name = records
4141
.first()
4242
.map(|r| r.topic.as_str())
@@ -47,10 +47,10 @@ impl ParseableSinkProcessor {
4747
.await?;
4848

4949
let mut json_vec = Vec::with_capacity(records.len());
50-
let mut total_payload_size = 0u64;
50+
let mut total_payload_size = 0;
5151

5252
for record in records.iter().filter_map(|r| r.payload.as_ref()) {
53-
total_payload_size += record.len() as u64;
53+
total_payload_size += record.len();
5454
if let Ok(value) = serde_json::from_slice::<Value>(record) {
5555
json_vec.push(value);
5656
}
@@ -60,6 +60,7 @@ impl ParseableSinkProcessor {
6060
.get_or_create_stream(stream_name)
6161
.push_logs(
6262
Value::Array(json_vec),
63+
total_payload_size,
6364
&LogSource::Custom("Kafka".to_owned()),
6465
)
6566
.await?;

src/event/format/json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ impl EventFormat for Event {
225225
/// Converts a JSON event into a Parseable Event
226226
fn into_event(
227227
self,
228-
origin_size: u64,
228+
origin_size: usize,
229229
stream: &Stream,
230230
log_source: &LogSource,
231231
) -> anyhow::Result<super::Event> {

src/event/format/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ pub trait EventFormat: Sized {
167167

168168
fn into_event(
169169
self,
170-
origin_size: u64,
170+
origin_size: usize,
171171
stream: &Stream,
172172
log_source: &LogSource,
173173
) -> Result<Event, AnyError>;

src/event/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub struct PartitionEvent {
3939

4040
pub struct Event {
4141
pub origin_format: &'static str,
42-
pub origin_size: u64,
42+
pub origin_size: usize,
4343
pub is_first_event: bool,
4444
pub time_partition: Option<String>,
4545
pub partitions: HashMap<String, PartitionEvent>,

src/handlers/http/cluster/mod.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use tracing::{error, info, warn};
3737
use url::Url;
3838
use utils::{check_liveness, to_url_string, IngestionStats, QueriedStats, StorageStats};
3939

40-
use crate::handlers::http::ingest::ingest_internal_stream;
40+
use crate::event::format::LogSource;
4141
use crate::metrics::prom_utils::Metrics;
4242
use crate::parseable::PARSEABLE;
4343
use crate::rbac::role::model::DefaultPrivilege;
@@ -774,29 +774,29 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
774774
scheduler
775775
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
776776
.run(move || async {
777+
let internal_stream = PARSEABLE.get_or_create_stream(INTERNAL_STREAM_NAME);
777778
let result: Result<(), PostError> = async {
778779
let cluster_metrics = fetch_cluster_metrics().await;
779-
if let Ok(metrics) = cluster_metrics {
780-
if !metrics.is_empty() {
781-
info!("Cluster metrics fetched successfully from all ingestors");
782-
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
783-
if matches!(
784-
ingest_internal_stream(
785-
INTERNAL_STREAM_NAME.to_string(),
786-
bytes::Bytes::from(metrics_bytes),
787-
)
788-
.await,
789-
Ok(())
790-
) {
791-
info!("Cluster metrics successfully ingested into internal stream");
792-
} else {
793-
error!("Failed to ingest cluster metrics into internal stream");
794-
}
795-
} else {
796-
error!("Failed to serialize cluster metrics");
797-
}
780+
let Ok(metrics) = cluster_metrics else {
781+
return Ok(());
782+
};
783+
if !metrics.is_empty() {
784+
info!("Cluster metrics fetched successfully from all ingestors");
785+
let json = serde_json::to_value(&metrics).expect("should be json serializable");
786+
let byte_size = serde_json::to_vec(&metrics).unwrap().len();
787+
788+
if matches!(
789+
internal_stream
790+
.push_logs(json, byte_size, &LogSource::Pmeta)
791+
.await,
792+
Ok(())
793+
) {
794+
info!("Cluster metrics successfully ingested into internal stream");
795+
} else {
796+
error!("Failed to ingest cluster metrics into internal stream");
798797
}
799798
}
799+
800800
Ok(())
801801
}
802802
.await;

src/handlers/http/cluster/utils.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,23 @@
1616
*
1717
*/
1818

19-
use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT};
20-
use actix_web::http::header;
19+
use std::{future::Future, pin::Pin};
20+
21+
use crate::{
22+
handlers::http::{base_path_without_preceding_slash, MAX_EVENT_PAYLOAD_SIZE},
23+
HTTP_CLIENT,
24+
};
25+
use actix_web::{
26+
dev::Payload,
27+
error::{ErrorPayloadTooLarge, JsonPayloadError},
28+
http::header,
29+
FromRequest, HttpRequest,
30+
};
31+
use bytes::BytesMut;
2132
use chrono::{DateTime, Utc};
33+
use futures::StreamExt;
2234
use itertools::Itertools;
23-
use serde::{Deserialize, Serialize};
35+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
2436
use tracing::error;
2537
use url::Url;
2638

@@ -248,3 +260,49 @@ pub fn to_url_string(str: String) -> String {
248260

249261
format!("http://{}/", str)
250262
}
263+
264+
pub struct JsonWithSize<T> {
265+
pub json: T,
266+
pub byte_size: usize,
267+
}
268+
269+
impl<T: DeserializeOwned + 'static> FromRequest for JsonWithSize<T> {
270+
type Error = actix_web::error::Error;
271+
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
272+
273+
fn from_request(_: &HttpRequest, payload: &mut Payload) -> Self::Future {
274+
let limit = MAX_EVENT_PAYLOAD_SIZE;
275+
276+
// Take ownership of payload for async processing
277+
let mut payload = payload.take();
278+
279+
Box::pin(async move {
280+
// Buffer to collect all bytes
281+
let mut body = BytesMut::new();
282+
let mut byte_size = 0;
283+
284+
// Collect all bytes from the payload stream
285+
while let Some(chunk) = payload.next().await {
286+
let chunk = chunk?;
287+
byte_size += chunk.len();
288+
289+
// Check the size limit
290+
if byte_size > limit {
291+
return Err(ErrorPayloadTooLarge(byte_size).into());
292+
}
293+
294+
// Extend our buffer with the chunk
295+
body.extend_from_slice(&chunk);
296+
}
297+
298+
// Convert the collected bytes to Bytes
299+
let bytes = body.freeze();
300+
301+
// Deserialize the JSON payload
302+
let json = serde_json::from_slice::<T>(&bytes)
303+
.map_err(|e| JsonPayloadError::Deserialize(e))?;
304+
305+
Ok(JsonWithSize { json, byte_size })
306+
})
307+
}
308+
}

src/handlers/http/ingest.rs

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818

1919
use std::collections::HashMap;
2020

21-
use actix_web::web::{Json, Path};
21+
use actix_web::web::Path;
2222
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
2323
use arrow_array::RecordBatch;
24-
use bytes::Bytes;
2524
use chrono::Utc;
2625
use http::StatusCode;
2726
use serde_json::Value;
@@ -36,14 +35,18 @@ use crate::storage::{ObjectStorageError, StreamType};
3635
use crate::utils::header_parsing::ParseHeaderError;
3736
use crate::utils::json::flatten::JsonFlattenError;
3837

38+
use super::cluster::utils::JsonWithSize;
3939
use super::logstream::error::{CreateStreamError, StreamError};
4040
use super::users::dashboards::DashboardError;
4141
use super::users::filters::FiltersError;
4242

4343
// Handler for POST /api/v1/ingest
4444
// ingests events by extracting stream name from header
4545
// creates if stream does not exist
46-
pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpResponse, PostError> {
46+
pub async fn ingest(
47+
req: HttpRequest,
48+
JsonWithSize { json, byte_size }: JsonWithSize<Value>,
49+
) -> Result<HttpResponse, PostError> {
4750
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
4851
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
4952
};
@@ -72,29 +75,18 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7275

7376
PARSEABLE
7477
.get_or_create_stream(&stream_name)
75-
.push_logs(json, &log_source)
78+
.push_logs(json, byte_size, &log_source)
7679
.await?;
7780

7881
Ok(HttpResponse::Ok().finish())
7982
}
8083

81-
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
82-
let json: Value = serde_json::from_slice(&body)?;
83-
84-
PARSEABLE
85-
.get_stream(&stream_name)?
86-
.push_logs(json, &LogSource::Pmeta)
87-
.await?;
88-
89-
Ok(())
90-
}
91-
9284
// Handler for POST /v1/logs to ingest OTEL logs
9385
// ingests events by extracting stream name from header
9486
// creates if stream does not exist
9587
pub async fn handle_otel_logs_ingestion(
9688
req: HttpRequest,
97-
Json(json): Json<Value>,
89+
JsonWithSize { json, byte_size }: JsonWithSize<Value>,
9890
) -> Result<HttpResponse, PostError> {
9991
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
10092
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -115,7 +107,7 @@ pub async fn handle_otel_logs_ingestion(
115107

116108
PARSEABLE
117109
.get_or_create_stream(&stream_name)
118-
.push_logs(json, &log_source)
110+
.push_logs(json, byte_size, &log_source)
119111
.await?;
120112

121113
Ok(HttpResponse::Ok().finish())
@@ -126,7 +118,7 @@ pub async fn handle_otel_logs_ingestion(
126118
// creates if stream does not exist
127119
pub async fn handle_otel_metrics_ingestion(
128120
req: HttpRequest,
129-
Json(json): Json<Value>,
121+
JsonWithSize { json, byte_size }: JsonWithSize<Value>,
130122
) -> Result<HttpResponse, PostError> {
131123
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
132124
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -149,7 +141,7 @@ pub async fn handle_otel_metrics_ingestion(
149141

150142
PARSEABLE
151143
.get_or_create_stream(&stream_name)
152-
.push_logs(json, &log_source)
144+
.push_logs(json, byte_size, &log_source)
153145
.await?;
154146

155147
Ok(HttpResponse::Ok().finish())
@@ -160,7 +152,7 @@ pub async fn handle_otel_metrics_ingestion(
160152
// creates if stream does not exist
161153
pub async fn handle_otel_traces_ingestion(
162154
req: HttpRequest,
163-
Json(json): Json<Value>,
155+
JsonWithSize { json, byte_size }: JsonWithSize<Value>,
164156
) -> Result<HttpResponse, PostError> {
165157
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
166158
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -180,7 +172,7 @@ pub async fn handle_otel_traces_ingestion(
180172

181173
PARSEABLE
182174
.get_or_create_stream(&stream_name)
183-
.push_logs(json, &log_source)
175+
.push_logs(json, byte_size, &log_source)
184176
.await?;
185177

186178
Ok(HttpResponse::Ok().finish())
@@ -192,7 +184,7 @@ pub async fn handle_otel_traces_ingestion(
192184
pub async fn post_event(
193185
req: HttpRequest,
194186
stream_name: Path<String>,
195-
Json(json): Json<Value>,
187+
JsonWithSize { json, byte_size }: JsonWithSize<Value>,
196188
) -> Result<HttpResponse, PostError> {
197189
let stream_name = stream_name.into_inner();
198190

@@ -232,7 +224,7 @@ pub async fn post_event(
232224

233225
PARSEABLE
234226
.get_or_create_stream(&stream_name)
235-
.push_logs(json, &log_source)
227+
.push_logs(json, byte_size, &log_source)
236228
.await?;
237229

238230
Ok(HttpResponse::Ok().finish())

src/metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::storage::StreamType;
3535
pub fn update_stats(
3636
stream_name: &str,
3737
origin: &'static str,
38-
size: u64,
38+
size: usize,
3939
num_rows: usize,
4040
parsed_date: NaiveDate,
4141
) {

src/parseable/streams.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,12 @@ impl Stream {
113113
})
114114
}
115115

116-
pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> anyhow::Result<()> {
117-
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
118-
116+
pub async fn push_logs(
117+
&self,
118+
json: Value,
119+
origin_size: usize,
120+
log_source: &LogSource,
121+
) -> anyhow::Result<()> {
119122
json::Event::new(json)
120123
.into_event(origin_size, self, log_source)?
121124
.process(self)?;

0 commit comments

Comments
 (0)