Skip to content

Commit 2a3825a

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main' into query-param
2 parents 5e7d8d1 + 523ecc7 commit 2a3825a

File tree

14 files changed

+239
-276
lines changed

14 files changed

+239
-276
lines changed

src/banner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
104104
Staging Path: \"{}\"",
105105
"Storage:".to_string().bold(),
106106
config.get_storage_mode_string(),
107-
config.staging_dir().to_string_lossy(),
107+
config.options.staging_dir().to_string_lossy(),
108108
);
109109

110110
if let Some(path) = &config.options.hot_tier_storage_path {

src/cli.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use clap::Parser;
20-
use std::path::PathBuf;
20+
use std::{env, fs, path::PathBuf};
2121

2222
use url::Url;
2323

@@ -385,4 +385,74 @@ impl Options {
385385
pub fn is_default_creds(&self) -> bool {
386386
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
387387
}
388+
389+
/// Path to staging directory, ensures that it exists or panics
390+
pub fn staging_dir(&self) -> &PathBuf {
391+
fs::create_dir_all(&self.local_staging_path)
392+
.expect("Should be able to create dir if doesn't exist");
393+
394+
&self.local_staging_path
395+
}
396+
397+
/// TODO: refactor and document
398+
pub fn get_url(&self) -> Url {
399+
if self.ingestor_endpoint.is_empty() {
400+
return format!(
401+
"{}://{}",
402+
self.get_scheme(),
403+
self.address
404+
)
405+
.parse::<Url>() // if the value was improperly set, this will panic before hand
406+
.unwrap_or_else(|err| {
407+
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
408+
});
409+
}
410+
411+
let ingestor_endpoint = &self.ingestor_endpoint;
412+
413+
if ingestor_endpoint.starts_with("http") {
414+
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
415+
}
416+
417+
let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();
418+
419+
if addr_from_env.len() != 2 {
420+
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
421+
}
422+
423+
let mut hostname = addr_from_env[0].to_string();
424+
let mut port = addr_from_env[1].to_string();
425+
426+
// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
427+
// fetch the value from the specified env vars
428+
if hostname.starts_with('$') {
429+
let var_hostname = hostname[1..].to_string();
430+
hostname = env::var(&var_hostname).unwrap_or_default();
431+
432+
if hostname.is_empty() {
433+
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
434+
}
435+
if hostname.starts_with("http") {
436+
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
437+
} else {
438+
hostname = format!("{}://{}", self.get_scheme(), hostname);
439+
}
440+
}
441+
442+
if port.starts_with('$') {
443+
let var_port = port[1..].to_string();
444+
port = env::var(&var_port).unwrap_or_default();
445+
446+
if port.is_empty() {
447+
panic!(
448+
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
449+
var_port
450+
);
451+
}
452+
}
453+
454+
format!("{}://{}:{}", self.get_scheme(), hostname, port)
455+
.parse::<Url>()
456+
.expect("Valid URL")
457+
}
388458
}

src/handlers/http/about.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
6666
let staging = if PARSEABLE.options.mode == Mode::Query {
6767
"".to_string()
6868
} else {
69-
PARSEABLE.staging_dir().display().to_string()
69+
PARSEABLE.options.staging_dir().display().to_string()
7070
};
7171
let grpc_port = PARSEABLE.options.grpc_port;
7272

src/handlers/http/logstream.rs

Lines changed: 23 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ use super::query::update_schema_when_distributed;
4848

4949
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
5050
let stream_name = stream_name.into_inner();
51-
if !PARSEABLE.streams.contains(&stream_name) {
51+
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
52+
if PARSEABLE.check_or_load_stream(&stream_name).await {
5253
return Err(StreamNotFound(stream_name).into());
5354
}
5455

@@ -121,15 +122,11 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
121122
Ok((web::Json(schema), StatusCode::OK))
122123
}
123124

124-
pub async fn schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
125+
pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
125126
let stream_name = stream_name.into_inner();
126127

127128
// Ensure parseable is aware of stream in distributed mode
128-
if PARSEABLE.options.mode == Mode::Query
129-
&& PARSEABLE
130-
.create_stream_and_schema_from_storage(&stream_name)
131-
.await?
132-
{
129+
if PARSEABLE.check_or_load_stream(&stream_name).await {
133130
return Err(StreamNotFound(stream_name.clone()).into());
134131
}
135132

@@ -165,14 +162,7 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
165162
// For query mode, if the stream not found in memory map,
166163
//check if it exists in the storage
167164
//create stream and schema from storage
168-
if PARSEABLE.options.mode == Mode::Query
169-
&& matches!(
170-
PARSEABLE
171-
.create_stream_and_schema_from_storage(&stream_name)
172-
.await,
173-
Ok(false) | Err(_)
174-
)
175-
{
165+
if PARSEABLE.check_or_load_stream(&stream_name).await {
176166
return Err(StreamNotFound(stream_name.clone()).into());
177167
}
178168

@@ -185,37 +175,24 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
185175

186176
pub async fn put_retention(
187177
stream_name: Path<String>,
188-
Json(json): Json<Value>,
178+
Json(retention): Json<Retention>,
189179
) -> Result<impl Responder, StreamError> {
190180
let stream_name = stream_name.into_inner();
191181

192182
// For query mode, if the stream not found in memory map,
193183
//check if it exists in the storage
194184
//create stream and schema from storage
195-
if PARSEABLE.options.mode == Mode::Query
196-
&& matches!(
197-
PARSEABLE
198-
.create_stream_and_schema_from_storage(&stream_name)
199-
.await,
200-
Ok(false) | Err(_)
201-
)
202-
{
203-
return Err(StreamNotFound(stream_name.clone()).into());
185+
if PARSEABLE.check_or_load_stream(&stream_name).await {
186+
return Err(StreamNotFound(stream_name).into());
204187
}
205188

206-
let stream = PARSEABLE.get_stream(&stream_name)?;
207-
let retention: Retention = match serde_json::from_value(json) {
208-
Ok(retention) => retention,
209-
Err(err) => return Err(StreamError::InvalidRetentionConfig(err)),
210-
};
211-
212189
PARSEABLE
213190
.storage
214191
.get_object_store()
215192
.put_retention(&stream_name, &retention)
216193
.await?;
217194

218-
stream.set_retention(retention);
195+
PARSEABLE.get_stream(&stream_name)?.set_retention(retention);
219196

220197
Ok((
221198
format!("set retention configuration for log stream {stream_name}"),
@@ -232,16 +209,8 @@ pub async fn get_stats(
232209
// For query mode, if the stream not found in memory map,
233210
//check if it exists in the storage
234211
//create stream and schema from storage
235-
if !PARSEABLE.streams.contains(&stream_name)
236-
&& (PARSEABLE.options.mode != Mode::Query
237-
|| matches!(
238-
PARSEABLE
239-
.create_stream_and_schema_from_storage(&stream_name)
240-
.await,
241-
Ok(false) | Err(_)
242-
))
243-
{
244-
return Err(StreamNotFound(stream_name).into());
212+
if PARSEABLE.check_or_load_stream(&stream_name).await {
213+
return Err(StreamNotFound(stream_name.clone()).into());
245214
}
246215

247216
if let Some(stats) = params.get_stats(&stream_name) {
@@ -324,17 +293,13 @@ pub async fn get_stats(
324293

325294
pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
326295
let stream_name = stream_name.into_inner();
327-
if !PARSEABLE.streams.contains(&stream_name)
328-
&& (PARSEABLE.options.mode != Mode::Query
329-
|| matches!(
330-
PARSEABLE
331-
.create_stream_and_schema_from_storage(&stream_name)
332-
.await,
333-
Ok(false) | Err(_)
334-
))
335-
{
336-
return Err(StreamNotFound(stream_name).into());
296+
// For query mode, if the stream not found in memory map,
297+
//check if it exists in the storage
298+
//create stream and schema from storage
299+
if PARSEABLE.check_or_load_stream(&stream_name).await {
300+
return Err(StreamNotFound(stream_name.clone()).into());
337301
}
302+
338303
let storage = PARSEABLE.storage.get_object_store();
339304
// if first_event_at is not found in memory map, check if it exists in the storage
340305
// if it exists in the storage, update the first_event_at in memory map
@@ -383,15 +348,8 @@ pub async fn put_stream_hot_tier(
383348
// For query mode, if the stream not found in memory map,
384349
//check if it exists in the storage
385350
//create stream and schema from storage
386-
if PARSEABLE.options.mode == Mode::Query
387-
&& matches!(
388-
PARSEABLE
389-
.create_stream_and_schema_from_storage(&stream_name)
390-
.await,
391-
Ok(false) | Err(_)
392-
)
393-
{
394-
return Err(StreamNotFound(stream_name.clone()).into());
351+
if PARSEABLE.check_or_load_stream(&stream_name).await {
352+
return Err(StreamNotFound(stream_name).into());
395353
}
396354

397355
let stream = PARSEABLE.get_stream(&stream_name)?;
@@ -437,16 +395,8 @@ pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Respo
437395
// For query mode, if the stream not found in memory map,
438396
//check if it exists in the storage
439397
//create stream and schema from storage
440-
if !PARSEABLE.streams.contains(&stream_name)
441-
&& (PARSEABLE.options.mode != Mode::Query
442-
|| matches!(
443-
PARSEABLE
444-
.create_stream_and_schema_from_storage(&stream_name)
445-
.await,
446-
Ok(false) | Err(_)
447-
))
448-
{
449-
return Err(StreamNotFound(stream_name).into());
398+
if PARSEABLE.check_or_load_stream(&stream_name).await {
399+
return Err(StreamNotFound(stream_name.clone()).into());
450400
}
451401

452402
let Some(hot_tier_manager) = HotTierManager::global() else {
@@ -465,15 +415,8 @@ pub async fn delete_stream_hot_tier(
465415
// For query mode, if the stream not found in memory map,
466416
//check if it exists in the storage
467417
//create stream and schema from storage
468-
if PARSEABLE.options.mode == Mode::Query
469-
&& matches!(
470-
PARSEABLE
471-
.create_stream_and_schema_from_storage(&stream_name)
472-
.await,
473-
Ok(false) | Err(_)
474-
)
475-
{
476-
return Err(StreamNotFound(stream_name.clone()).into());
418+
if PARSEABLE.check_or_load_stream(&stream_name).await {
419+
return Err(StreamNotFound(stream_name).into());
477420
}
478421

479422
if PARSEABLE.get_stream(&stream_name)?.get_stream_type() == StreamType::Internal {

0 commit comments

Comments
 (0)