Skip to content

Commit 5bbd309

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main'
2 parents 9b0d865 + 8381e72 commit 5bbd309

37 files changed

+1341
-169
lines changed

src/alerts/mod.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion::common::tree_node::TreeNode;
2424
use http::StatusCode;
2525
use itertools::Itertools;
2626
use once_cell::sync::Lazy;
27+
use serde::Serialize;
2728
use serde_json::Error as SerdeError;
2829
use std::collections::{HashMap, HashSet};
2930
use std::fmt::{self, Display};
@@ -873,3 +874,52 @@ impl Alerts {
873874
Ok(())
874875
}
875876
}
877+
878+
#[derive(Debug, Serialize)]
879+
pub struct AlertsInfo {
880+
total: u64,
881+
silenced: u64,
882+
resolved: u64,
883+
triggered: u64,
884+
low: u64,
885+
medium: u64,
886+
high: u64,
887+
}
888+
889+
// TODO: add RBAC
890+
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
891+
let alerts = ALERTS.alerts.read().await;
892+
let mut total = 0;
893+
let mut silenced = 0;
894+
let mut resolved = 0;
895+
let mut triggered = 0;
896+
let mut low = 0;
897+
let mut medium = 0;
898+
let mut high = 0;
899+
900+
for (_, alert) in alerts.iter() {
901+
total += 1;
902+
match alert.state {
903+
AlertState::Silenced => silenced += 1,
904+
AlertState::Resolved => resolved += 1,
905+
AlertState::Triggered => triggered += 1,
906+
}
907+
908+
match alert.severity {
909+
Severity::Low => low += 1,
910+
Severity::Medium => medium += 1,
911+
Severity::High => high += 1,
912+
_ => {}
913+
}
914+
}
915+
916+
Ok(AlertsInfo {
917+
total,
918+
silenced,
919+
resolved,
920+
triggered,
921+
low,
922+
medium,
923+
high,
924+
})
925+
}

src/catalog/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,12 @@ pub async fn remove_manifest_from_snapshot(
340340
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
341341
}
342342
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
343+
Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
344+
std::io::Error::new(
345+
std::io::ErrorKind::Unsupported,
346+
"Can't remove manifest from within Index server",
347+
),
348+
))),
343349
}
344350
}
345351

@@ -350,6 +356,7 @@ pub async fn get_first_event(
350356
) -> Result<Option<String>, ObjectStorageError> {
351357
let mut first_event_at: String = String::default();
352358
match PARSEABLE.options.mode {
359+
Mode::Index => unimplemented!(),
353360
Mode::All | Mode::Ingest => {
354361
// get current snapshot
355362
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();

src/enterprise/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod utils;

src/enterprise/utils.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::{collections::HashMap, path::PathBuf, sync::Arc};
2+
3+
use datafusion::{common::Column, prelude::Expr};
4+
use itertools::Itertools;
5+
use relative_path::RelativePathBuf;
6+
7+
use crate::query::stream_schema_provider::extract_primary_filter;
8+
use crate::{
9+
catalog::{
10+
manifest::{File, Manifest},
11+
snapshot, Snapshot,
12+
},
13+
event,
14+
parseable::PARSEABLE,
15+
query::{stream_schema_provider::ManifestExt, PartialTimeFilter},
16+
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
17+
utils::time::TimeRange,
18+
};
19+
20+
pub fn create_time_filter(
21+
time_range: &TimeRange,
22+
time_partition: Option<String>,
23+
table_name: &str,
24+
) -> Vec<Expr> {
25+
let mut new_filters = vec![];
26+
let start_time = time_range.start.naive_utc();
27+
let end_time = time_range.end.naive_utc();
28+
let mut _start_time_filter: Expr;
29+
let mut _end_time_filter: Expr;
30+
31+
match time_partition {
32+
Some(time_partition) => {
33+
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
34+
.binary_expr(Expr::Column(Column::new(
35+
Some(table_name.to_owned()),
36+
time_partition.clone(),
37+
)));
38+
_end_time_filter =
39+
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(
40+
Expr::Column(Column::new(Some(table_name.to_owned()), time_partition)),
41+
);
42+
}
43+
None => {
44+
_start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
45+
.binary_expr(Expr::Column(Column::new(
46+
Some(table_name.to_owned()),
47+
event::DEFAULT_TIMESTAMP_KEY,
48+
)));
49+
_end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
50+
.binary_expr(Expr::Column(Column::new(
51+
Some(table_name.to_owned()),
52+
event::DEFAULT_TIMESTAMP_KEY,
53+
)));
54+
}
55+
}
56+
57+
new_filters.push(_start_time_filter);
58+
new_filters.push(_end_time_filter);
59+
60+
new_filters
61+
}
62+
63+
pub async fn fetch_parquet_file_paths(
64+
stream: &str,
65+
time_range: &TimeRange,
66+
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
67+
let glob_storage = PARSEABLE.storage.get_object_store();
68+
69+
let object_store_format = glob_storage.get_object_store_format(stream).await?;
70+
71+
let time_partition = object_store_format.time_partition;
72+
73+
let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream);
74+
75+
let time_filters = extract_primary_filter(&time_filter_expr, &time_partition);
76+
77+
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();
78+
79+
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
80+
let obs = glob_storage
81+
.get_objects(
82+
Some(&path),
83+
Box::new(|file_name| file_name.ends_with("stream.json")),
84+
)
85+
.await;
86+
if let Ok(obs) = obs {
87+
for ob in obs {
88+
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {
89+
let snapshot = object_store_format.snapshot;
90+
for manifest in snapshot.manifest_list {
91+
merged_snapshot.manifest_list.push(manifest);
92+
}
93+
}
94+
}
95+
}
96+
97+
let manifest_files = collect_manifest_files(
98+
glob_storage,
99+
merged_snapshot
100+
.manifests(&time_filters)
101+
.into_iter()
102+
.sorted_by_key(|file| file.time_lower_bound)
103+
.map(|item| item.manifest_path)
104+
.collect(),
105+
)
106+
.await?;
107+
108+
let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new();
109+
110+
let mut selected_files = manifest_files
111+
.into_iter()
112+
.flat_map(|file| file.files)
113+
.rev()
114+
.collect_vec();
115+
116+
for filter in time_filter_expr {
117+
selected_files.retain(|file| !file.can_be_pruned(&filter))
118+
}
119+
120+
selected_files
121+
.into_iter()
122+
.map(|file| {
123+
let date = file.file_path.split("/").collect_vec();
124+
125+
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
126+
127+
let date = RelativePathBuf::from_iter(date);
128+
129+
parquet_files.entry(date).or_default().push(file);
130+
})
131+
.for_each(|_| {});
132+
133+
Ok(parquet_files)
134+
}
135+
136+
async fn collect_manifest_files(
137+
storage: Arc<dyn ObjectStorage>,
138+
manifest_urls: Vec<String>,
139+
) -> Result<Vec<Manifest>, ObjectStorageError> {
140+
let mut tasks = Vec::new();
141+
manifest_urls.into_iter().for_each(|path| {
142+
let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path");
143+
let storage = Arc::clone(&storage);
144+
tasks.push(tokio::task::spawn(async move {
145+
storage.get_object(&path).await
146+
}));
147+
});
148+
149+
let mut op = Vec::new();
150+
for task in tasks {
151+
let file = task.await??;
152+
op.push(file);
153+
}
154+
155+
Ok(op
156+
.into_iter()
157+
.map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest"))
158+
.collect())
159+
}

src/handlers/http/logstream.rs

Lines changed: 19 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use self::error::StreamError;
20-
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
20+
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
2121
use super::query::update_schema_when_distributed;
2222
use crate::event::format::override_data_type;
2323
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
@@ -263,64 +263,26 @@ pub async fn get_stats(
263263
let stats = stats::get_current_stats(&stream_name, "json")
264264
.ok_or_else(|| StreamNotFound(stream_name.clone()))?;
265265

266-
let ingestor_stats: Option<Vec<QueriedStats>> = None;
267-
268-
let hash_map = PARSEABLE.streams.read().expect("Readable");
269-
let stream_meta = &hash_map
270-
.get(&stream_name)
271-
.ok_or_else(|| StreamNotFound(stream_name.clone()))?
272-
.metadata
273-
.read()
274-
.expect(LOCK_EXPECT);
275-
276266
let time = Utc::now();
277267

278-
let stats = match &stream_meta.first_event_at {
279-
Some(_) => {
280-
let ingestion_stats = IngestionStats::new(
281-
stats.current_stats.events,
282-
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
283-
stats.lifetime_stats.events,
284-
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
285-
stats.deleted_stats.events,
286-
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
287-
"json",
288-
);
289-
let storage_stats = StorageStats::new(
290-
format!("{} {}", stats.current_stats.storage, "Bytes"),
291-
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
292-
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
293-
"parquet",
294-
);
295-
296-
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
297-
}
298-
299-
None => {
300-
let ingestion_stats = IngestionStats::new(
301-
stats.current_stats.events,
302-
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
303-
stats.lifetime_stats.events,
304-
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
305-
stats.deleted_stats.events,
306-
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
307-
"json",
308-
);
309-
let storage_stats = StorageStats::new(
310-
format!("{} {}", stats.current_stats.storage, "Bytes"),
311-
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
312-
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
313-
"parquet",
314-
);
315-
316-
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
317-
}
318-
};
319-
let stats = if let Some(mut ingestor_stats) = ingestor_stats {
320-
ingestor_stats.push(stats);
321-
merge_quried_stats(ingestor_stats)
322-
} else {
323-
stats
268+
let stats = {
269+
let ingestion_stats = IngestionStats::new(
270+
stats.current_stats.events,
271+
format!("{} Bytes", stats.current_stats.ingestion),
272+
stats.lifetime_stats.events,
273+
format!("{} Bytes", stats.lifetime_stats.ingestion),
274+
stats.deleted_stats.events,
275+
format!("{} Bytes", stats.deleted_stats.ingestion),
276+
"json",
277+
);
278+
let storage_stats = StorageStats::new(
279+
format!("{} Bytes", stats.current_stats.storage),
280+
format!("{} Bytes", stats.lifetime_stats.storage),
281+
format!("{} Bytes", stats.deleted_stats.storage),
282+
"parquet",
283+
);
284+
285+
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
324286
};
325287

326288
let stats = serde_json::to_value(stats)?;

src/handlers/http/middleware.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,25 @@ where
357357
Ok(res)
358358
})
359359
}
360+
361+
Mode::Index => {
362+
let accessable_endpoints = ["create", "delete"];
363+
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
364+
if !cond {
365+
Box::pin(async {
366+
Err(actix_web::error::ErrorUnauthorized(
367+
"Only Index API can be accessed in Index Mode",
368+
))
369+
})
370+
} else {
371+
let fut = self.service.call(req);
372+
373+
Box::pin(async move {
374+
let res = fut.await?;
375+
Ok(res)
376+
})
377+
}
378+
}
360379
}
361380
}
362381
}

src/handlers/http/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,25 @@ pub mod logstream;
3939
pub mod middleware;
4040
pub mod modal;
4141
pub mod oidc;
42+
pub mod prism_home;
43+
pub mod prism_logstream;
4244
pub mod query;
4345
pub mod rbac;
4446
pub mod role;
4547
pub mod users;
4648
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
4749
pub const API_BASE_PATH: &str = "api";
4850
pub const API_VERSION: &str = "v1";
51+
pub const PRISM_BASE_PATH: &str = "prism";
4952

5053
pub fn base_path() -> String {
5154
format!("/{API_BASE_PATH}/{API_VERSION}")
5255
}
5356

57+
pub fn prism_base_path() -> String {
58+
format!("/{API_BASE_PATH}/{PRISM_BASE_PATH}/{API_VERSION}")
59+
}
60+
5461
pub fn metrics_path() -> String {
5562
format!("{}/metrics", base_path())
5663
}

0 commit comments

Comments
 (0)