Skip to content

Commit 24afa12

Browse files
author
Devdutt Shenoi
committed
fix: dataloss due to contention at stream creation
1 parent 6e5242f commit 24afa12

File tree

2 files changed

+12
-7
lines changed

2 files changed

+12
-7
lines changed

src/parseable/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ impl Parseable {
171171
}
172172

173173
// Gets write privileges only for creating the stream when it doesn't already exist.
174-
self.streams.create(
174+
self.streams.get_or_create(
175175
self.options.clone(),
176176
stream_name.to_owned(),
177177
LogStreamMetadata::default(),
@@ -342,7 +342,7 @@ impl Parseable {
342342
schema_version,
343343
log_source,
344344
);
345-
self.streams.create(
345+
self.streams.get_or_create(
346346
self.options.clone(),
347347
stream_name.to_string(),
348348
metadata,
@@ -652,7 +652,7 @@ impl Parseable {
652652
SchemaVersion::V1, // New stream
653653
log_source,
654654
);
655-
self.streams.create(
655+
self.streams.get_or_create(
656656
self.options.clone(),
657657
stream_name.to_string(),
658658
metadata,

src/parseable/streams.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -737,17 +737,22 @@ pub struct Streams(RwLock<HashMap<String, StreamRef>>);
737737
// 4. When first event is sent to stream (update the schema)
738738
// 5. When set alert API is called (update the alert)
739739
impl Streams {
740-
pub fn create(
740+
/// Checks after getting an excluse lock whether already stream exists, else creates it.
741+
/// NOTE: This is done to ensure we don't have contention among threads.
742+
pub fn get_or_create(
741743
&self,
742744
options: Arc<Options>,
743745
stream_name: String,
744746
metadata: LogStreamMetadata,
745747
ingestor_id: Option<String>,
746748
) -> StreamRef {
749+
let mut guard = self.write().expect(LOCK_EXPECT);
750+
if let Some(stream) = guard.get(&stream_name) {
751+
return stream.clone();
752+
}
753+
747754
let stream = Stream::new(options, &stream_name, metadata, ingestor_id);
748-
self.write()
749-
.expect(LOCK_EXPECT)
750-
.insert(stream_name, stream.clone());
755+
guard.insert(stream_name, stream.clone());
751756

752757
stream
753758
}

0 commit comments

Comments
 (0)