Skip to content

Commit 1e32ad9

Browse files
author
Devdutt Shenoi
committed
fix: actually create stream in query mode if not present in memory
1 parent 3e54ec0 commit 1e32ad9

File tree

1 file changed

+28
-26
lines changed

1 file changed

+28
-26
lines changed

src/handlers/http/logstream.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,10 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
4949
let stream_name = stream_name.into_inner();
5050
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
5151
if !PARSEABLE.streams.contains(&stream_name)
52-
&& PARSEABLE.options.mode == Mode::Query
53-
&& matches!(
54-
PARSEABLE
52+
&& (PARSEABLE.options.mode != Mode::Query
53+
|| !PARSEABLE
5554
.create_stream_and_schema_from_storage(&stream_name)
56-
.await,
57-
Ok(true) | Err(_)
58-
)
55+
.await?)
5956
{
6057
return Err(StreamNotFound(stream_name).into());
6158
}
@@ -133,10 +130,11 @@ pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, Str
133130
let stream_name = stream_name.into_inner();
134131

135132
// Ensure parseable is aware of stream in distributed mode
136-
if PARSEABLE.options.mode == Mode::Query
137-
&& !PARSEABLE
138-
.create_stream_and_schema_from_storage(&stream_name)
139-
.await?
133+
if !PARSEABLE.streams.contains(&stream_name)
134+
&& (PARSEABLE.options.mode != Mode::Query
135+
|| !PARSEABLE
136+
.create_stream_and_schema_from_storage(&stream_name)
137+
.await?)
140138
{
141139
return Err(StreamNotFound(stream_name.clone()).into());
142140
}
@@ -173,10 +171,11 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
173171
// For query mode, if the stream not found in memory map,
174172
//check if it exists in the storage
175173
//create stream and schema from storage
176-
if PARSEABLE.options.mode == Mode::Query
177-
&& !PARSEABLE
178-
.create_stream_and_schema_from_storage(&stream_name)
179-
.await?
174+
if !PARSEABLE.streams.contains(&stream_name)
175+
&& (PARSEABLE.options.mode != Mode::Query
176+
|| !PARSEABLE
177+
.create_stream_and_schema_from_storage(&stream_name)
178+
.await?)
180179
{
181180
return Err(StreamNotFound(stream_name.clone()).into());
182181
}
@@ -197,10 +196,11 @@ pub async fn put_retention(
197196
// For query mode, if the stream not found in memory map,
198197
//check if it exists in the storage
199198
//create stream and schema from storage
200-
if PARSEABLE.options.mode == Mode::Query
201-
&& !PARSEABLE
202-
.create_stream_and_schema_from_storage(&stream_name)
203-
.await?
199+
if !PARSEABLE.streams.contains(&stream_name)
200+
&& (PARSEABLE.options.mode != Mode::Query
201+
|| !PARSEABLE
202+
.create_stream_and_schema_from_storage(&stream_name)
203+
.await?)
204204
{
205205
return Err(StreamNotFound(stream_name).into());
206206
}
@@ -410,10 +410,11 @@ pub async fn put_stream_hot_tier(
410410
// For query mode, if the stream not found in memory map,
411411
//check if it exists in the storage
412412
//create stream and schema from storage
413-
if PARSEABLE.options.mode == Mode::Query
414-
&& !PARSEABLE
415-
.create_stream_and_schema_from_storage(&stream_name)
416-
.await?
413+
if !PARSEABLE.streams.contains(&stream_name)
414+
&& (PARSEABLE.options.mode != Mode::Query
415+
|| !PARSEABLE
416+
.create_stream_and_schema_from_storage(&stream_name)
417+
.await?)
417418
{
418419
return Err(StreamNotFound(stream_name).into());
419420
}
@@ -486,10 +487,11 @@ pub async fn delete_stream_hot_tier(
486487
// For query mode, if the stream not found in memory map,
487488
//check if it exists in the storage
488489
//create stream and schema from storage
489-
if PARSEABLE.options.mode == Mode::Query
490-
&& !PARSEABLE
491-
.create_stream_and_schema_from_storage(&stream_name)
492-
.await?
490+
if !PARSEABLE.streams.contains(&stream_name)
491+
&& (PARSEABLE.options.mode != Mode::Query
492+
|| !PARSEABLE
493+
.create_stream_and_schema_from_storage(&stream_name)
494+
.await?)
493495
{
494496
return Err(StreamNotFound(stream_name).into());
495497
}

0 commit comments

Comments
 (0)