Skip to content

Commit 06beee3

Browse files
duplicate check
1 parent c56ccca commit 06beee3

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

src/migration/stream_metadata_migration.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,15 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value {
195195
let mut log_source_entry = LogSourceEntry::default();
196196
match log_source {
197197
Some(stream_log_source) => {
198-
let log_source: LogSource = serde_json::from_value(stream_log_source.clone()).unwrap();
199-
log_source_entry.add_log_source(log_source, HashSet::new());
198+
if let Ok(log_source) = serde_json::from_value::<LogSource>(stream_log_source.clone()) {
199+
log_source_entry.add_log_source(log_source, HashSet::new());
200+
} else {
201+
// If deserialization fails, use default
202+
stream_metadata_map.insert(
203+
"log_source".to_owned(),
204+
LogSourceEntry::default().to_value(),
205+
);
206+
}
200207
stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
201208
}
202209
None => {

src/parseable/streams.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -686,10 +686,20 @@ impl Stream {
686686

687687
pub fn add_log_source(&self, log_source: LogSourceEntry) {
688688
let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
689+
// Check if this log source format already exists
690+
for existing in &metadata.log_source {
691+
if existing.log_source_format == log_source.log_source_format {
692+
self.add_fields_to_log_source(
693+
&log_source.log_source_format,
694+
log_source.fields.clone(),
695+
);
696+
return;
697+
}
698+
}
689699
metadata.log_source.push(log_source);
690700
}
691701

692-
pub fn add_fields_to_log_source(&self, log_source: &LogSource, fields: Vec<String>) {
702+
pub fn add_fields_to_log_source(&self, log_source: &LogSource, fields: HashSet<String>) {
693703
let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
694704
for log_source_entry in metadata.log_source.iter_mut() {
695705
if log_source_entry.log_source_format == *log_source {

0 commit comments

Comments
 (0)