Skip to content

Commit 433a04f

Browse files
author
Devdutt Shenoi
committed
refactor: replace with single method
1 parent 1e32ad9 commit 433a04f

File tree

2 files changed

+19
-55
lines changed

2 files changed

+19
-55
lines changed

src/handlers/http/logstream.rs

Lines changed: 9 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::event::format::override_data_type;
2323
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
2424
use crate::metadata::SchemaVersion;
2525
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
26-
use crate::option::Mode;
2726
use crate::parseable::{StreamNotFound, PARSEABLE};
2827
use crate::rbac::role::Action;
2928
use crate::rbac::Users;
@@ -48,12 +47,7 @@ use tracing::warn;
4847
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
4948
let stream_name = stream_name.into_inner();
5049
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
51-
if !PARSEABLE.streams.contains(&stream_name)
52-
&& (PARSEABLE.options.mode != Mode::Query
53-
|| !PARSEABLE
54-
.create_stream_and_schema_from_storage(&stream_name)
55-
.await?)
56-
{
50+
if PARSEABLE.check_or_load_stream(&stream_name).await {
5751
return Err(StreamNotFound(stream_name).into());
5852
}
5953

@@ -130,12 +124,7 @@ pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, Str
130124
let stream_name = stream_name.into_inner();
131125

132126
// Ensure parseable is aware of stream in distributed mode
133-
if !PARSEABLE.streams.contains(&stream_name)
134-
&& (PARSEABLE.options.mode != Mode::Query
135-
|| !PARSEABLE
136-
.create_stream_and_schema_from_storage(&stream_name)
137-
.await?)
138-
{
127+
if PARSEABLE.check_or_load_stream(&stream_name).await {
139128
return Err(StreamNotFound(stream_name.clone()).into());
140129
}
141130

@@ -171,12 +160,7 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
171160
// For query mode, if the stream not found in memory map,
172161
//check if it exists in the storage
173162
//create stream and schema from storage
174-
if !PARSEABLE.streams.contains(&stream_name)
175-
&& (PARSEABLE.options.mode != Mode::Query
176-
|| !PARSEABLE
177-
.create_stream_and_schema_from_storage(&stream_name)
178-
.await?)
179-
{
163+
if PARSEABLE.check_or_load_stream(&stream_name).await {
180164
return Err(StreamNotFound(stream_name.clone()).into());
181165
}
182166

@@ -196,12 +180,7 @@ pub async fn put_retention(
196180
// For query mode, if the stream not found in memory map,
197181
//check if it exists in the storage
198182
//create stream and schema from storage
199-
if !PARSEABLE.streams.contains(&stream_name)
200-
&& (PARSEABLE.options.mode != Mode::Query
201-
|| !PARSEABLE
202-
.create_stream_and_schema_from_storage(&stream_name)
203-
.await?)
204-
{
183+
if PARSEABLE.check_or_load_stream(&stream_name).await {
205184
return Err(StreamNotFound(stream_name).into());
206185
}
207186

@@ -252,12 +231,7 @@ pub async fn get_stats(
252231
// For query mode, if the stream not found in memory map,
253232
//check if it exists in the storage
254233
//create stream and schema from storage
255-
if !PARSEABLE.streams.contains(&stream_name)
256-
&& (PARSEABLE.options.mode != Mode::Query
257-
|| !PARSEABLE
258-
.create_stream_and_schema_from_storage(&stream_name)
259-
.await?)
260-
{
234+
if PARSEABLE.check_or_load_stream(&stream_name).await {
261235
return Err(StreamNotFound(stream_name.clone()).into());
262236
}
263237

@@ -353,12 +327,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
353327
// For query mode, if the stream not found in memory map,
354328
//check if it exists in the storage
355329
//create stream and schema from storage
356-
if !PARSEABLE.streams.contains(&stream_name)
357-
&& (PARSEABLE.options.mode != Mode::Query
358-
|| !PARSEABLE
359-
.create_stream_and_schema_from_storage(&stream_name)
360-
.await?)
361-
{
330+
if PARSEABLE.check_or_load_stream(&stream_name).await {
362331
return Err(StreamNotFound(stream_name.clone()).into());
363332
}
364333

@@ -410,12 +379,7 @@ pub async fn put_stream_hot_tier(
410379
// For query mode, if the stream not found in memory map,
411380
//check if it exists in the storage
412381
//create stream and schema from storage
413-
if !PARSEABLE.streams.contains(&stream_name)
414-
&& (PARSEABLE.options.mode != Mode::Query
415-
|| !PARSEABLE
416-
.create_stream_and_schema_from_storage(&stream_name)
417-
.await?)
418-
{
382+
if PARSEABLE.check_or_load_stream(&stream_name).await {
419383
return Err(StreamNotFound(stream_name).into());
420384
}
421385

@@ -462,12 +426,7 @@ pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Respo
462426
// For query mode, if the stream not found in memory map,
463427
//check if it exists in the storage
464428
//create stream and schema from storage
465-
if !PARSEABLE.streams.contains(&stream_name)
466-
&& (PARSEABLE.options.mode != Mode::Query
467-
|| !PARSEABLE
468-
.create_stream_and_schema_from_storage(&stream_name)
469-
.await?)
470-
{
429+
if PARSEABLE.check_or_load_stream(&stream_name).await {
471430
return Err(StreamNotFound(stream_name.clone()).into());
472431
}
473432

@@ -487,12 +446,7 @@ pub async fn delete_stream_hot_tier(
487446
// For query mode, if the stream not found in memory map,
488447
//check if it exists in the storage
489448
//create stream and schema from storage
490-
if !PARSEABLE.streams.contains(&stream_name)
491-
&& (PARSEABLE.options.mode != Mode::Query
492-
|| !PARSEABLE
493-
.create_stream_and_schema_from_storage(&stream_name)
494-
.await?)
495-
{
449+
if PARSEABLE.check_or_load_stream(&stream_name).await {
496450
return Err(StreamNotFound(stream_name).into());
497451
}
498452

src/parseable/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,16 @@ impl Parseable {
169169
)
170170
}
171171

172+
/// Checks for the stream in memory, or loads it from storage when in distributed mode
173+
pub async fn check_or_load_stream(&self, stream_name: &str) -> bool {
174+
!self.streams.contains(stream_name)
175+
&& (self.options.mode != Mode::Query
176+
|| !self
177+
.create_stream_and_schema_from_storage(stream_name)
178+
.await
179+
.unwrap_or_default())
180+
}
181+
172182
/// Writes all streams in staging onto disk, awaiting conversion into parquet.
173183
/// Deletes all in memory recordbatches, freeing up rows in mem-writer.
174184
pub fn flush_all_streams(&self) {

0 commit comments

Comments
 (0)