Skip to content

reject event if fields count exceed 250 #1311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 14, 2025
10 changes: 10 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
pub const DEFAULT_USERNAME: &str = "admin";
pub const DEFAULT_PASSWORD: &str = "admin";

pub const DATASET_FIELD_COUNT_LIMIT: usize = 250;
#[derive(Parser)]
#[command(
name = "parseable",
Expand Down Expand Up @@ -368,6 +369,15 @@ pub struct Options {

#[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
pub ms_clarity_tag: Option<String>,

#[arg(
long,
env = "P_DATASET_FIELD_COUNT_LIMIT",
default_value_t = DATASET_FIELD_COUNT_LIMIT,
value_parser = validation::validate_dataset_fields_allowed_limit,
help = "total number of fields recommended in a dataset"
)]
pub dataset_fields_allowed_limit: usize,
}

#[derive(Parser, Debug)]
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ pub enum PostError {
KnownFormat(#[from] known_schema::Error),
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")]
IncorrectLogFormat(String),
#[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")]
FieldsCountLimitExceeded(String, usize, usize),
}

impl actix_web::ResponseError for PostError {
Expand Down Expand Up @@ -495,6 +497,7 @@ impl actix_web::ResponseError for PostError {
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST,
}
}

Expand Down
35 changes: 35 additions & 0 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub async fn flatten_and_push_logs(
log_source: &LogSource,
p_custom_fields: &HashMap<String, String>,
) -> Result<(), PostError> {
// Verify the dataset fields count
verify_dataset_fields_count(stream_name)?;

match log_source {
LogSource::Kinesis => {
//custom flattening required for Amazon Kinesis
Expand Down Expand Up @@ -205,6 +208,38 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, Strin
p_custom_fields
}

fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> {
let fields_count = PARSEABLE
.get_stream(stream_name)?
.get_schema()
.fields()
.len();
let dataset_fields_warn_threshold = 0.8 * PARSEABLE.options.dataset_fields_allowed_limit as f64;
// Check if the fields count exceeds the warn threshold
if fields_count > dataset_fields_warn_threshold as usize {
tracing::warn!(
"Dataset {0} has {1} fields, which exceeds the warning threshold of {2}. Ingestion will not be possible after reaching {3} fields. We recommend creating a new dataset.",
stream_name,
fields_count,
dataset_fields_warn_threshold as usize,
PARSEABLE.options.dataset_fields_allowed_limit
);
}
// Check if the fields count exceeds the limit
// Return an error if the fields count exceeds the limit
if fields_count > PARSEABLE.options.dataset_fields_allowed_limit {
let error = PostError::FieldsCountLimitExceeded(
stream_name.to_string(),
fields_count,
PARSEABLE.options.dataset_fields_allowed_limit,
);
tracing::error!("{}", error);
// Return an error if the fields count exceeds the limit
return Err(error);
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
16 changes: 16 additions & 0 deletions src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub mod validation {
path::{Path, PathBuf},
};

use crate::cli::DATASET_FIELD_COUNT_LIMIT;
use path_clean::PathClean;

use super::{Compression, Mode};
Expand Down Expand Up @@ -173,4 +174,19 @@ pub mod validation {
Err("Invalid value for max disk usage. It should be given as 90.0 for 90%".to_string())
}
}

pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> {
if let Ok(size) = s.parse::<usize>() {
if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) {
Ok(size)
} else {
Err(format!(
"Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be between 1 and {}",
DATASET_FIELD_COUNT_LIMIT
))
}
} else {
Err("Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be given as integer value".to_string())
}
}
}
61 changes: 22 additions & 39 deletions src/otel/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,33 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use super::otel_utils::collect_json_from_values;
use super::otel_utils::convert_epoch_nano_to_timestamp;
use super::otel_utils::insert_attributes;
use opentelemetry_proto::tonic::logs::v1::LogRecord;
use opentelemetry_proto::tonic::logs::v1::LogsData;
use opentelemetry_proto::tonic::logs::v1::ScopeLogs;
use opentelemetry_proto::tonic::logs::v1::SeverityNumber;
use serde_json::Map;
use serde_json::Value;

use super::otel_utils::add_other_attributes_if_not_empty;
use super::otel_utils::collect_json_from_values;
use super::otel_utils::convert_epoch_nano_to_timestamp;
use super::otel_utils::insert_attributes;
use super::otel_utils::merge_attributes_in_json;

pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of this list now? We're defaulting to separate columns right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this list was initially maintained to store the known field list along with the known log format name in the stream info, with an idea that UI can use the fields list to apply quick filters

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

identifying useful columns is difficult. chances of getting it wrong are high. In UX we're working on a way that lets user decide what is important for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can create an issue and work on it in a separate PR, similar change needs to be done at the place when we detect schema and add fields to stream_info, other than otel

"scope_name",
"scope_version",
"scope_log_schema_url",
"scope_dropped_attributes_count",
"resource_dropped_attributes_count",
"schema_url",
"time_unix_nano",
"observed_time_unix_nano",
"severity_number",
"severity_text",
"body",
"flags",
"log_record_dropped_attributes_count",
"span_id",
"trace_id",
"event_name",
];
/// otel log event has severity number
/// there is a mapping of severity number to severity text provided in proto
Expand All @@ -60,7 +66,6 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> {
/// this function is called recursively for each log record object in the otel logs
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
let mut log_record_json: Map<String, Value> = Map::new();
let mut other_attributes = Map::new();
log_record_json.insert(
"time_unix_nano".to_string(),
Value::String(convert_epoch_nano_to_timestamp(
Expand All @@ -83,11 +88,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
}
}
insert_attributes(
&mut log_record_json,
&log_record.attributes,
&mut other_attributes,
);
insert_attributes(&mut log_record_json, &log_record.attributes);
log_record_json.insert(
"log_record_dropped_attributes_count".to_string(),
Value::Number(log_record.dropped_attributes_count.into()),
Expand All @@ -106,9 +107,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
Value::String(hex::encode(&log_record.trace_id)),
);

// Add the `other_attributes` to the log record json
add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes);

log_record_json
}

Expand All @@ -117,18 +115,13 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
let mut vec_scope_log_json = Vec::new();
let mut scope_log_json = Map::new();
let mut other_attributes = Map::new();
if let Some(scope) = &scope_log.scope {
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
scope_log_json.insert(
"scope_version".to_string(),
Value::String(scope.version.clone()),
);
insert_attributes(
&mut scope_log_json,
&scope.attributes,
&mut other_attributes,
);
insert_attributes(&mut scope_log_json, &scope.attributes);
scope_log_json.insert(
"scope_dropped_attributes_count".to_string(),
Value::Number(scope.dropped_attributes_count.into()),
Expand All @@ -146,26 +139,17 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
vec_scope_log_json.push(combined_json);
}

// Add the `other_attributes` to the scope log json
merge_attributes_in_json(other_attributes, &mut vec_scope_log_json);

vec_scope_log_json
}

/// this function performs the custom flattening of the otel logs
/// and returns a `Vec` of `Value::Object` of the flattened json
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
let mut vec_otel_json = Vec::new();

for record in &message.resource_logs {
let mut resource_log_json = Map::new();
let mut other_attributes = Map::new();
if let Some(resource) = &record.resource {
insert_attributes(
&mut resource_log_json,
&resource.attributes,
&mut other_attributes,
);
insert_attributes(&mut resource_log_json, &resource.attributes);
resource_log_json.insert(
"resource_dropped_attributes_count".to_string(),
Value::Number(resource.dropped_attributes_count.into()),
Expand All @@ -176,19 +160,18 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
for scope_log in &record.scope_logs {
vec_resource_logs_json.extend(flatten_scope_log(scope_log));
}

resource_log_json.insert(
"schema_url".to_string(),
Value::String(record.schema_url.clone()),
);

for resource_logs_json in &mut vec_resource_logs_json {
resource_logs_json.extend(resource_log_json.clone());
}

// Add the `other_attributes` to the resource log json
merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json);

vec_otel_json.extend(vec_resource_logs_json);
vec_otel_json.push(Value::Object(resource_logs_json.clone()));
}
}
vec_otel_json.into_iter().map(Value::Object).collect()

vec_otel_json
}
Loading
Loading