Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ pub struct Cli {

/// CORS behaviour
pub cors: bool,

/// The local hot_tier path is used for optimising the query performance in the distributed systems
pub hot_tier_storage_path: Option<PathBuf>,

///maximum disk usage allowed
pub max_disk_usage: f64,
}

impl Cli {
Expand Down Expand Up @@ -134,6 +140,8 @@ impl Cli {
pub const DEFAULT_PASSWORD: &'static str = "admin";
pub const FLIGHT_PORT: &'static str = "flight-port";
pub const CORS: &'static str = "cors";
pub const HOT_TIER_PATH: &'static str = "hot-tier-path";
pub const MAX_DISK_USAGE: &'static str = "max-disk-usage";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
Expand Down Expand Up @@ -395,7 +403,27 @@ impl Cli {
"lz4",
"zstd"])
.help("Parquet compression algorithm"),
).group(
)
.arg(
Arg::new(Self::HOT_TIER_PATH)
.long(Self::HOT_TIER_PATH)
.env("P_HOT_TIER_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for hot tier data")
.next_line_help(true),
)
.arg(
Arg::new(Self::MAX_DISK_USAGE)
.long(Self::MAX_DISK_USAGE)
.env("P_MAX_DISK_USAGE_PERCENT")
.value_name("percentage")
.default_value("80.0")
.value_parser(validation::validate_disk_usage)
.help("Maximum allowed disk usage in percentage e.g 90.0 for 90%")
.next_line_help(true),
)
.group(
ArgGroup::new("oidc")
.args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
.requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
Expand Down Expand Up @@ -532,6 +560,12 @@ impl FromArgMatches for Cli {
_ => unreachable!(),
};

self.hot_tier_storage_path = m.get_one::<PathBuf>(Self::HOT_TIER_PATH).cloned();
self.max_disk_usage = m
.get_one::<f64>(Self::MAX_DISK_USAGE)
.cloned()
.expect("default for max disk usage");

Ok(())
}
}
137 changes: 136 additions & 1 deletion server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
UPDATE_STREAM_KEY,
};
use crate::hottier::{HotTierManager, StreamHotTier};
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::{Mode, CONFIG};
Expand All @@ -37,6 +38,7 @@ use crate::{
catalog::{self, remove_manifest_from_snapshot},
event, stats,
};

use crate::{metadata, validator};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
Expand Down Expand Up @@ -919,6 +921,122 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
Ok((web::Json(stream_info), StatusCode::OK))
}

pub async fn put_stream_hot_tier(
req: HttpRequest,
body: web::Json<serde_json::Value>,
) -> Result<impl Responder, StreamError> {
if CONFIG.parseable.mode != Mode::Query {
return Err(StreamError::Custom {
msg: "Hot tier can only be enabled in query mode".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

if STREAM_INFO
.get_time_partition(&stream_name)
.unwrap()
.is_some()
{
return Err(StreamError::Custom {
msg: "Hot tier can not be enabled for stream with time partition".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

let body = body.into_inner();
let mut hottier: StreamHotTier = match serde_json::from_value(body) {
Ok(hottier) => hottier,
Err(err) => return Err(StreamError::InvalidHotTierConfig(err)),
};

validator::hot_tier(&hottier.size.to_string())?;

STREAM_INFO.set_hot_tier(&stream_name, true)?;
if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = Some("0GiB".to_string());
hottier.available_size = Some(hottier.size.clone());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = CONFIG.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = Some(true);
storage
.put_stream_manifest(&stream_name, &stream_metadata)
.await?;
}

Ok((
format!("hot tier set for stream {stream_name}"),
StatusCode::OK,
))
}

pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
if CONFIG.parseable.mode != Mode::Query {
return Err(StreamError::Custom {
msg: "Hot tier can only be enabled in query mode".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

if let Some(hot_tier_manager) = HotTierManager::global() {
let hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
msg: format!("hot tier not initialised for stream {}", stream_name),
status: (StatusCode::BAD_REQUEST),
})
}
}

pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, StreamError> {
if CONFIG.parseable.mode != Mode::Query {
return Err(StreamError::Custom {
msg: "Hot tier can only be enabled in query mode".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

if CONFIG.parseable.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.delete_hot_tier(&stream_name).await?;
}
Ok((
format!("hot tier deleted for stream {stream_name}"),
StatusCode::OK,
))
}
#[allow(unused)]
fn classify_json_error(kind: serde_json::error::Category) -> StatusCode {
match kind {
Expand All @@ -935,9 +1053,12 @@ pub mod error {
use http::StatusCode;

use crate::{
hottier::HotTierError,
metadata::error::stream_info::MetadataError,
storage::ObjectStorageError,
validator::error::{AlertValidationError, StreamNameValidationError},
validator::error::{
AlertValidationError, HotTierValidationError, StreamNameValidationError,
},
};

#[allow(unused)]
Expand Down Expand Up @@ -997,6 +1118,16 @@ pub mod error {
Network(#[from] reqwest::Error),
#[error("Could not deserialize into JSON object, {0}")]
SerdeError(#[from] serde_json::Error),
#[error(
"Hot tier is not enabled at the server config, cannot enable hot tier for stream {0}"
)]
HotTierNotEnabled(String),
#[error("failed to enable hottier due to err: {0}")]
InvalidHotTierConfig(serde_json::Error),
#[error("Hot tier validation failed due to {0}")]
HotTierValidation(#[from] HotTierValidationError),
#[error("{0}")]
HotTierError(#[from] HotTierError),
}

impl actix_web::ResponseError for StreamError {
Expand Down Expand Up @@ -1030,6 +1161,10 @@ pub mod error {
StreamError::Network(err) => {
err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
}
StreamError::HotTierNotEnabled(_) => StatusCode::BAD_REQUEST,
StreamError::InvalidHotTierConfig(_) => StatusCode::BAD_REQUEST,
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
6 changes: 5 additions & 1 deletion server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::handlers::airplane;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};

use crate::hottier::HotTierManager;
use crate::rbac::role::Action;
use crate::sync;
use crate::users::dashboards::DASHBOARDS;
Expand Down Expand Up @@ -188,6 +188,10 @@ impl QueryServer {
if matches!(init_cluster_metrics_schedular(), Ok(())) {
log::info!("Cluster metrics scheduler started successfully");
}

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.download_from_s3()?;
};
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync();
Expand Down
19 changes: 19 additions & 0 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,25 @@ impl Server {
.to(logstream::get_cache_enabled)
.authorize_for_stream(Action::GetCacheEnabled),
),
)
.service(
web::resource("/hottier")
// PUT "/logstream/{logstream}/hottier" ==> Set hottier for given logstream
.route(
web::put()
.to(logstream::put_stream_hot_tier)
.authorize_for_stream(Action::PutHotTierEnabled),
)
.route(
web::get()
.to(logstream::get_stream_hot_tier)
.authorize_for_stream(Action::GetHotTierEnabled),
)
.route(
web::delete()
.to(logstream::delete_stream_hot_tier)
.authorize_for_stream(Action::DeleteHotTierEnabled),
),
),
)
}
Expand Down
Loading
Loading