Skip to content

Updates #1408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged

Updates #1408

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ impl AlertRequest {
TARGETS.get_target_by_id(id).await?;
}
let datasets = resolve_stream_names(&self.query)?;

if datasets.len() != 1 {
return Err(AlertError::ValidationFailure(format!(
"Query should include only one dataset. Found: {datasets:?}"
)));
}

let config = AlertConfig {
version: AlertVersion::from(CURRENT_ALERTS_VERSION),
id: Ulid::new(),
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ struct QuerierStatus {
last_used: Option<Instant>,
}

async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
pub async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
// Get all querier metadata
let querier_metadata: Vec<NodeMetadata> = get_node_info(NodeType::Querier).await?;

Expand Down
12 changes: 9 additions & 3 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::{
INTRA_CLUSTER_CLIENT,
handlers::http::{base_path_without_preceding_slash, modal::NodeType},
prism::logstream::PrismLogstreamError,
};
use actix_web::http::header;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -135,7 +136,12 @@ impl StorageStats {
}
}

pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
pub fn merge_queried_stats(stats: Vec<QueriedStats>) -> Result<QueriedStats, PrismLogstreamError> {
if stats.len() < 2 {
return Err(PrismLogstreamError::Anyhow(anyhow::Error::msg(
"Expected at least two logstreams in merge_queried_stats",
)));
}
// get the stream name
let stream_name = stats[1].stream.clone();

Expand Down Expand Up @@ -167,12 +173,12 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
deleted_size: acc.deleted_size + x.deleted_size,
});

QueriedStats::new(
Ok(QueriedStats::new(
&stream_name,
min_time,
cumulative_ingestion,
cumulative_storage,
)
))
}

pub async fn check_liveness(domain_name: &str) -> bool {
Expand Down
34 changes: 24 additions & 10 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ use tracing::{error, warn};
static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());

use crate::{
handlers::http::{
base_path_without_preceding_slash,
cluster::{
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats},
handlers::{
UPDATE_STREAM_KEY,
http::{
base_path_without_preceding_slash,
cluster::{
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats},
},
logstream::error::StreamError,
modal::{NodeMetadata, NodeType},
},
logstream::error::StreamError,
modal::{NodeMetadata, NodeType},
},
hottier::HotTierManager,
parseable::{PARSEABLE, StreamNotFound},
Expand Down Expand Up @@ -115,14 +118,24 @@ pub async fn put_stream(
body: Bytes,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
let _ = CREATE_STREAM_LOCK.lock().await;
let _guard = CREATE_STREAM_LOCK.lock().await;
let headers = PARSEABLE
.create_update_stream(req.headers(), &body, &stream_name)
.await?;

let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) {
val.to_str().unwrap() == "true"
} else {
false
};

sync_streams_with_ingestors(headers, body, &stream_name).await?;

Ok(("Log stream created", StatusCode::OK))
if is_update {
Ok(("Log stream updated", StatusCode::OK))
} else {
Ok(("Log stream created", StatusCode::OK))
}
}

pub async fn get_stats(
Expand Down Expand Up @@ -218,7 +231,8 @@ pub async fn get_stats(

let stats = if let Some(mut ingestor_stats) = ingestor_stats {
ingestor_stats.push(stats);
merge_quried_stats(ingestor_stats)
merge_queried_stats(ingestor_stats)
.map_err(|e| StreamError::Anyhow(anyhow::Error::msg(e.to_string())))?
} else {
stats
};
Expand Down
20 changes: 14 additions & 6 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use crate::option::Mode;
use crate::rbac::map::SessionKey;
use crate::utils::arrow::record_batches_to_json;
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
Expand All @@ -43,7 +44,7 @@ use std::time::Instant;
use tokio::task::JoinSet;
use tracing::{error, warn};

use crate::event::commit_schema;
use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::parseable::{PARSEABLE, StreamNotFound};
use crate::query::error::ExecuteError;
Expand Down Expand Up @@ -79,7 +80,7 @@ pub struct Query {
/// TODO: Improve this function and make this a part of the query API
pub async fn get_records_and_fields(
query_request: &Query,
req: &HttpRequest,
creds: &SessionKey,
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
let session_state = QUERY_SESSION.state();
let time_range =
Expand All @@ -89,8 +90,8 @@ pub async fn get_records_and_fields(
create_streams_for_distributed(tables.clone()).await?;

let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;
let creds = extract_session_key_from_req(req)?;
let permissions = Users.get_permissions(&creds);

let permissions = Users.get_permissions(creds);

user_auth_for_datasets(&permissions, &tables).await?;

Expand Down Expand Up @@ -350,7 +351,12 @@ pub async fn get_counts(
// if the user has given a sql query (counts call with filters applied), then use this flow
// this could include filters or group by
if body.conditions.is_some() {
let sql = body.get_df_sql().await?;
let time_partition = PARSEABLE
.get_stream(&body.stream)?
.get_time_partition()
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into());

let sql = body.get_df_sql(time_partition).await?;

let query_request = Query {
query: sql,
Expand All @@ -362,7 +368,9 @@ pub async fn get_counts(
filter_tags: None,
};

let (records, _) = get_records_and_fields(&query_request, &req).await?;
let creds = extract_session_key_from_req(&req)?;

let (records, _) = get_records_and_fields(&query_request, &creds).await?;

if let Some(records) = records {
let json_records = record_batches_to_json(&records)?;
Expand Down
12 changes: 9 additions & 3 deletions src/prism/logstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
handlers::http::{
cluster::{
fetch_stats_from_ingestors,
utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats},
utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats},
},
logstream::error::StreamError,
query::{QueryError, update_schema_when_distributed},
Expand Down Expand Up @@ -136,7 +136,7 @@ async fn get_stats(stream_name: &str) -> Result<QueriedStats, PrismLogstreamErro

let stats = if let Some(mut ingestor_stats) = ingestor_stats {
ingestor_stats.push(stats);
merge_quried_stats(ingestor_stats)
merge_queried_stats(ingestor_stats)?
} else {
stats
};
Expand Down Expand Up @@ -218,7 +218,7 @@ pub struct PrismDatasetResponse {

/// Request parameters for retrieving Prism dataset information.
/// Defines which streams to query
#[derive(Deserialize, Default)]
#[derive(Deserialize, Default, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PrismDatasetRequest {
/// List of stream names to query
Expand Down Expand Up @@ -381,6 +381,10 @@ pub enum PrismLogstreamError {
Execute(#[from] ExecuteError),
#[error("Auth: {0}")]
Auth(#[from] actix_web::Error),
#[error("SerdeError: {0}")]
SerdeError(#[from] serde_json::Error),
#[error("ReqwestError: {0}")]
ReqwestError(#[from] reqwest::Error),
}

impl actix_web::ResponseError for PrismLogstreamError {
Expand All @@ -393,6 +397,8 @@ impl actix_web::ResponseError for PrismLogstreamError {
PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND,
PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::SerdeError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::ReqwestError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED,
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::catalog::Snapshot as CatalogSnapshot;
use crate::catalog::column::{Int64Type, TypedStatistics};
use crate::catalog::manifest::Manifest;
use crate::catalog::snapshot::Snapshot;
use crate::event::{self, DEFAULT_TIMESTAMP_KEY};
use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::handlers::http::query::QueryError;
use crate::option::Mode;
use crate::parseable::PARSEABLE;
Expand Down Expand Up @@ -345,7 +345,7 @@ impl CountsRequest {
.get_stream(&self.stream)
.map_err(|err| anyhow::Error::msg(err.to_string()))?
.get_time_partition()
.unwrap_or_else(|| event::DEFAULT_TIMESTAMP_KEY.to_owned());
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.to_owned());

// get time range
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
Expand Down Expand Up @@ -441,7 +441,7 @@ impl CountsRequest {
}

/// This function will get executed only if self.conditions is some
pub async fn get_df_sql(&self) -> Result<String, QueryError> {
pub async fn get_df_sql(&self, time_column: String) -> Result<String, QueryError> {
// unwrap because we have asserted that it is some
let count_conditions = self.conditions.as_ref().unwrap();

Expand All @@ -452,19 +452,19 @@ impl CountsRequest {
let date_bin = if dur.num_minutes() <= 60 * 10 {
// date_bin 1 minute
format!(
"CAST(DATE_BIN('1 minute', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
"CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
self.stream
)
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
// date_bin 1 hour
format!(
"CAST(DATE_BIN('1 hour', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
"CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
self.stream
)
} else {
// date_bin 1 day
format!(
"CAST(DATE_BIN('1 day', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
"CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
self.stream
)
};
Expand All @@ -486,7 +486,7 @@ impl CountsRequest {
}

/// Response for the counts API
#[derive(Debug, Serialize, Clone)]
#[derive(Debug, Serialize, Clone, Deserialize)]
pub struct CountsResponse {
/// Fields in the log stream
pub fields: Vec<String>,
Expand Down Expand Up @@ -653,7 +653,7 @@ fn table_contains_any_time_filters(
})
.any(|expr| {
matches!(&*expr.left, Expr::Column(Column { name, .. })
if name == time_column)
if name == &default_timestamp || name == time_column)
})
}

Expand Down
Loading