Skip to content

Commit d6ca844

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main' into static-schema
2 parents 3e9bb84 + 7d9b9ab commit d6ca844

38 files changed

+1592
-374
lines changed

CONTRIBUTING.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
### Contributing
2+
3+
This document outlines how you can contribute to Parseable.
4+
5+
Thank you for considering to contribute to Parseable. The goal of this document is to provide everything you need to start your contribution. We encourage all contributions, including but not limited to:
6+
- Code patches, bug fixes.
7+
- Tutorials or blog posts.
8+
- Improving the documentation.
9+
- Submitting [bug reports](https://github.com/parseablehq/parseable/issues/new).
10+
11+
### Prerequisites
12+
13+
- Your PR has an associated issue. Find an [existing issue](https://github.com/parseablehq/parseable/issues) or open a new issue.
14+
- You've discussed the with [Parseable community](https://logg.ing/community).
15+
- You're familiar with GitHub Pull Requests(PR) workflow.
16+
- You've read the [Parseable documentation](https://www.parseable.com/docs).
17+
18+
### Contribution workflow
19+
20+
- Fork the [Parseable repository](https://help.github.com/en/github/getting-started-with-github/fork-a-repo) in your own GitHub account.
21+
- [Create a new branch](https://help.github.com/en/github/collaborating-with-issues-and-pull-requests/creating-and-deleting-branches-within-your-repository).
22+
- Review the Development Workflow section that describes the steps to maintain the repository.
23+
- Make your changes on your branch.
24+
- Submit the branch as a Pull Request pointing to the main branch of the Parseable repository. A maintainer should comment and/or review your Pull Request within a few hours.
25+
- You’ll be asked to review & sign the [Parseable Contributor License Agreement (CLA)](https://github.com/parseablehq/.github/blob/main/CLA.md) on the GitHub PR. Please ensure that you review the document. Once you’re ready, please sign the CLA by adding a comment I have read the CLA Document and I hereby sign the CLA in your Pull Request.
26+
- Please ensure that the commits in your Pull Request are made by the same GitHub user (which was used to create the PR and add the comment).
27+
28+
### Development workflow
29+
30+
We recommend Linux or macOS as the development platform for Parseable.
31+
32+
#### Setup and run Parseable
33+
34+
Parseable needs Rust 1.77.1 or above. Use the below command to build and run Parseable binary with local mode.
35+
36+
```sh
37+
cargo run --release local-store
38+
```
39+
40+
We recommend using the --release flag to test the full performance of Parseable.
41+
42+
#### Running Tests
43+
44+
```sh
45+
cargo test
46+
```
47+
48+
This command will be triggered to each PR as a requirement for merging it.
49+
50+
If you get a "Too many open files" error you might want to increase the open file limit using this command:
51+
52+
```sh
53+
ulimit -Sn 3000
54+
```
55+
56+
If you get a OpenSSL related error while building Parseable, you might need to install the dependencies using this command:
57+
58+
```sh
59+
sudo apt install build-essential
60+
sudo apt-get install libssl-dev
61+
sudo apt-get install pkg-config
62+
```
63+
64+
### Git Guidelines
65+
66+
- The PR title should be accurate and descriptive of the changes.
67+
- The draft PRs are recommended when you want to show that you are working on something and make your work visible. Convert your PR as a draft if your changes are a work in progress. Maintainers will review the PR once you mark your PR as ready for review.
68+
- The branch related to the PR must be up-to-date with main before merging. We use Bors to automatically enforce this requirement without the PR author having to rebase manually.

Cargo.lock

Lines changed: 2 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cross.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[target.aarch64-unknown-linux-gnu]
2+
image = "ghcr.io/cross-rs/aarch64-unknown-linux-gnu@sha256:1e2a0291f92a4372cbc22d8994e735473045383f1ce7fa44a16c234ba00187f4"
3+
4+
[target.x86_64-unknown-linux-gnu]
5+
image = "ghcr.io/cross-rs/x86_64-unknown-linux-gnu@sha256:bf05360bb9d6d4947eed60532ac7a0d7e8fae8f214e9abb801d5941c8fe4918d"

src/alerts/alerts_utils.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tracing::trace;
3131

3232
use crate::{
3333
alerts::AggregateCondition,
34+
parseable::PARSEABLE,
3435
query::{TableScanVisitor, QUERY_SESSION},
3536
rbac::{
3637
map::SessionKey,
@@ -137,8 +138,9 @@ async fn execute_base_query(
137138
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
138139
})?;
139140

141+
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
140142
query
141-
.get_dataframe(stream_name)
143+
.get_dataframe(time_partition.as_ref())
142144
.await
143145
.map_err(|err| AlertError::CustomError(err.to_string()))
144146
}

src/alerts/mod.rs

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion::common::tree_node::TreeNode;
2424
use http::StatusCode;
2525
use itertools::Itertools;
2626
use once_cell::sync::Lazy;
27+
use serde::Serialize;
2728
use serde_json::Error as SerdeError;
2829
use std::collections::{HashMap, HashSet};
2930
use std::fmt::{self, Display};
@@ -36,7 +37,7 @@ use ulid::Ulid;
3637
pub mod alerts_utils;
3738
pub mod target;
3839

39-
use crate::parseable::PARSEABLE;
40+
use crate::parseable::{StreamNotFound, PARSEABLE};
4041
use crate::query::{TableScanVisitor, QUERY_SESSION};
4142
use crate::rbac::map::SessionKey;
4243
use crate::storage;
@@ -513,17 +514,16 @@ impl AlertConfig {
513514

514515
// for now proceed in a similar fashion as we do in query
515516
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
516-
let stream_name = if let Some(stream_name) = query.first_table_name() {
517-
stream_name
518-
} else {
517+
let Some(stream_name) = query.first_table_name() else {
519518
return Err(AlertError::CustomError(format!(
520519
"Table name not found in query- {}",
521520
self.query
522521
)));
523522
};
524523

524+
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
525525
let base_df = query
526-
.get_dataframe(stream_name)
526+
.get_dataframe(time_partition.as_ref())
527527
.await
528528
.map_err(|err| AlertError::CustomError(err.to_string()))?;
529529

@@ -703,6 +703,8 @@ pub enum AlertError {
703703
CustomError(String),
704704
#[error("Invalid State Change: {0}")]
705705
InvalidStateChange(String),
706+
#[error("{0}")]
707+
StreamNotFound(#[from] StreamNotFound),
706708
}
707709

708710
impl actix_web::ResponseError for AlertError {
@@ -716,6 +718,7 @@ impl actix_web::ResponseError for AlertError {
716718
Self::DatafusionError(_) => StatusCode::INTERNAL_SERVER_ERROR,
717719
Self::CustomError(_) => StatusCode::BAD_REQUEST,
718720
Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
721+
Self::StreamNotFound(_) => StatusCode::NOT_FOUND,
719722
}
720723
}
721724

@@ -873,3 +876,52 @@ impl Alerts {
873876
Ok(())
874877
}
875878
}
879+
880+
#[derive(Debug, Serialize)]
881+
pub struct AlertsInfo {
882+
total: u64,
883+
silenced: u64,
884+
resolved: u64,
885+
triggered: u64,
886+
low: u64,
887+
medium: u64,
888+
high: u64,
889+
}
890+
891+
// TODO: add RBAC
892+
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
893+
let alerts = ALERTS.alerts.read().await;
894+
let mut total = 0;
895+
let mut silenced = 0;
896+
let mut resolved = 0;
897+
let mut triggered = 0;
898+
let mut low = 0;
899+
let mut medium = 0;
900+
let mut high = 0;
901+
902+
for (_, alert) in alerts.iter() {
903+
total += 1;
904+
match alert.state {
905+
AlertState::Silenced => silenced += 1,
906+
AlertState::Resolved => resolved += 1,
907+
AlertState::Triggered => triggered += 1,
908+
}
909+
910+
match alert.severity {
911+
Severity::Low => low += 1,
912+
Severity::Medium => medium += 1,
913+
Severity::High => high += 1,
914+
_ => {}
915+
}
916+
}
917+
918+
Ok(AlertsInfo {
919+
total,
920+
silenced,
921+
resolved,
922+
triggered,
923+
low,
924+
medium,
925+
high,
926+
})
927+
}

src/cli.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,12 @@ pub struct Options {
260260
help = "Set a fixed memory limit for query in GiB"
261261
)]
262262
pub query_memory_pool_size: Option<usize>,
263-
263+
// reduced the max row group size from 1048576
264+
// smaller row groups help in faster query performance in multi threaded query
264265
#[arg(
265266
long,
266267
env = "P_PARQUET_ROW_GROUP_SIZE",
267-
default_value = "1048576",
268+
default_value = "262144",
268269
help = "Number of rows in a row group"
269270
)]
270271
pub row_group_size: usize,

src/handlers/airplane.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::handlers::http::query::{into_query, update_schema_when_distributed};
3838
use crate::handlers::livetail::cross_origin_config;
3939
use crate::metrics::QUERY_EXECUTE_TIME;
4040
use crate::parseable::PARSEABLE;
41-
use crate::query::{TableScanVisitor, QUERY_SESSION};
41+
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
4242
use crate::utils::arrow::flight::{
4343
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
4444
send_to_ingester,
@@ -215,8 +215,8 @@ impl FlightService for AirServiceImpl {
215215
Status::permission_denied("User Does not have permission to access this")
216216
})?;
217217
let time = Instant::now();
218-
let (records, _) = query
219-
.execute(stream_name.clone())
218+
219+
let (records, _) = execute(query, &stream_name)
220220
.await
221221
.map_err(|err| Status::internal(err.to_string()))?;
222222

src/handlers/http/cluster/mod.rs

Lines changed: 23 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -367,64 +367,29 @@ pub async fn sync_role_update_with_ingestors(
367367
Ok(())
368368
}
369369

370-
pub async fn fetch_daily_stats_from_ingestors(
371-
stream_name: &str,
370+
pub fn fetch_daily_stats_from_ingestors(
372371
date: &str,
372+
stream_meta_list: &[ObjectStoreFormat],
373373
) -> Result<Stats, StreamError> {
374-
let mut total_events_ingested: u64 = 0;
375-
let mut total_ingestion_size: u64 = 0;
376-
let mut total_storage_size: u64 = 0;
377-
378-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
379-
error!("Fatal: failed to get ingestor info: {:?}", err);
380-
StreamError::Anyhow(err)
381-
})?;
382-
for ingestor in ingestor_infos.iter() {
383-
let uri = Url::parse(&format!(
384-
"{}{}/metrics",
385-
&ingestor.domain_name,
386-
base_path_without_preceding_slash()
387-
))
388-
.map_err(|err| {
389-
StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
390-
})?;
391-
392-
let res = HTTP_CLIENT
393-
.get(uri)
394-
.header(header::AUTHORIZATION, &ingestor.token)
395-
.header(header::CONTENT_TYPE, "application/json")
396-
.send()
397-
.await;
398-
399-
if let Ok(res) = res {
400-
let text = res
401-
.text()
402-
.await
403-
.map_err(|err| StreamError::Anyhow(anyhow::anyhow!("Request failed: {}", err)))?;
404-
let lines: Vec<Result<String, std::io::Error>> =
405-
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
406-
407-
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
408-
.map_err(|err| {
409-
StreamError::Anyhow(anyhow::anyhow!(
410-
"Invalid URL in Ingestor Metadata: {}",
411-
err
412-
))
413-
})?
414-
.samples;
415-
416-
let (events_ingested, ingestion_size, storage_size) =
417-
Metrics::get_daily_stats_from_samples(sample, stream_name, date);
418-
total_events_ingested += events_ingested;
419-
total_ingestion_size += ingestion_size;
420-
total_storage_size += storage_size;
374+
// for the given date, get the stats from the ingestors
375+
let mut events_ingested = 0;
376+
let mut ingestion_size = 0;
377+
let mut storage_size = 0;
378+
379+
for meta in stream_meta_list.iter() {
380+
for manifest in meta.snapshot.manifest_list.iter() {
381+
if manifest.time_lower_bound.date_naive().to_string() == date {
382+
events_ingested += manifest.events_ingested;
383+
ingestion_size += manifest.ingestion_size;
384+
storage_size += manifest.storage_size;
385+
}
421386
}
422387
}
423388

424389
let stats = Stats {
425-
events: total_events_ingested,
426-
ingestion: total_ingestion_size,
427-
storage: total_storage_size,
390+
events: events_ingested,
391+
ingestion: ingestion_size,
392+
storage: storage_size,
428393
};
429394
Ok(stats)
430395
}
@@ -474,17 +439,17 @@ pub async fn fetch_stats_from_ingestors(
474439
Utc::now(),
475440
IngestionStats::new(
476441
count,
477-
format!("{} Bytes", ingestion_size),
442+
ingestion_size,
478443
lifetime_count,
479-
format!("{} Bytes", lifetime_ingestion_size),
444+
lifetime_ingestion_size,
480445
deleted_count,
481-
format!("{} Bytes", deleted_ingestion_size),
446+
deleted_ingestion_size,
482447
"json",
483448
),
484449
StorageStats::new(
485-
format!("{} Bytes", storage_size),
486-
format!("{} Bytes", lifetime_storage_size),
487-
format!("{} Bytes", deleted_storage_size),
450+
storage_size,
451+
lifetime_storage_size,
452+
deleted_storage_size,
488453
"parquet",
489454
),
490455
);

0 commit comments

Comments
 (0)