Skip to content

Commit 0a85484

Browse files
update logic for check or load stream
1 parent 0e4f849 commit 0a85484

File tree

2 files changed

+11
-19
lines changed

2 files changed

+11
-19
lines changed

src/parseable/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,14 @@ impl Parseable {
212212

213213
/// Checks for the stream in memory, or loads it from storage when in distributed mode
214214
pub async fn check_or_load_stream(&self, stream_name: &str) -> bool {
215-
!self.streams.contains(stream_name)
216-
&& (self.options.mode != Mode::Query
217-
|| self.options.mode != Mode::Prism
218-
|| !self
219-
.create_stream_and_schema_from_storage(stream_name)
220-
.await
221-
.unwrap_or_default())
215+
if self.streams.contains(stream_name) {
216+
return true;
217+
}
218+
(self.options.mode == Mode::Query || self.options.mode == Mode::Prism)
219+
&& self
220+
.create_stream_and_schema_from_storage(stream_name)
221+
.await
222+
.unwrap_or_default()
222223
}
223224

224225
// validate the storage, if the proper path for staging directory is provided

src/prism/logstream/mod.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub async fn get_prism_logstream_info(
8686

8787
async fn get_stream_schema_helper(stream_name: &str) -> Result<Arc<Schema>, StreamError> {
8888
// Ensure parseable is aware of stream in distributed mode
89-
if PARSEABLE.check_or_load_stream(stream_name).await {
89+
if !PARSEABLE.check_or_load_stream(stream_name).await {
9090
return Err(StreamNotFound(stream_name.to_owned()).into());
9191
}
9292

@@ -152,7 +152,7 @@ async fn get_stream_info_helper(stream_name: &str) -> Result<StreamInfo, StreamE
152152
// For query mode, if the stream not found in memory map,
153153
//check if it exists in the storage
154154
//create stream and schema from storage
155-
if PARSEABLE.check_or_load_stream(stream_name).await {
155+
if !PARSEABLE.check_or_load_stream(stream_name).await {
156156
return Err(StreamNotFound(stream_name.to_owned()).into());
157157
}
158158

@@ -289,7 +289,7 @@ impl PrismDatasetRequest {
289289
}
290290

291291
// Skip streams that don't exist
292-
if !self.stream_exists(&stream).await {
292+
if !PARSEABLE.check_or_load_stream(&stream).await {
293293
return Ok(None);
294294
}
295295

@@ -311,15 +311,6 @@ impl PrismDatasetRequest {
311311
}
312312
}
313313

314-
async fn stream_exists(&self, stream: &str) -> bool {
315-
if PARSEABLE.check_or_load_stream(stream).await {
316-
warn!("Stream not found: {stream}");
317-
false
318-
} else {
319-
true
320-
}
321-
}
322-
323314
async fn build_dataset_response(
324315
&self,
325316
stream: String,

0 commit comments

Comments
 (0)