Skip to content

Commit 01d7e8b

Browse files
tests for migration, coderabbitai suggestions
1 parent 06beee3 commit 01d7e8b

File tree

2 files changed

+44
-16
lines changed

2 files changed

+44
-16
lines changed

src/migration/stream_metadata_migration.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -193,23 +193,12 @@ pub fn v5_v6(mut stream_metadata: Value) -> Value {
193193
);
194194
let log_source = stream_metadata_map.get("log_source");
195195
let mut log_source_entry = LogSourceEntry::default();
196-
match log_source {
197-
Some(stream_log_source) => {
198-
if let Ok(log_source) = serde_json::from_value::<LogSource>(stream_log_source.clone()) {
199-
log_source_entry.add_log_source(log_source, HashSet::new());
200-
} else {
201-
// If deserialization fails, use default
202-
stream_metadata_map.insert(
203-
"log_source".to_owned(),
204-
LogSourceEntry::default().to_value(),
205-
);
206-
}
207-
stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
208-
}
209-
None => {
210-
stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
196+
if log_source.is_some() {
197+
if let Ok(log_source) = serde_json::from_value::<LogSource>(log_source.unwrap().clone()) {
198+
log_source_entry.add_log_source(log_source, HashSet::new());
211199
}
212200
}
201+
stream_metadata_map.insert("log_source".to_owned(), log_source_entry.to_value());
213202
stream_metadata
214203
}
215204

@@ -238,3 +227,30 @@ fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value {
238227
snapshot_map.insert("manifest_list".to_owned(), Value::Array(new_manifest_list));
239228
snapshot
240229
}
230+
231+
#[cfg(test)]
232+
mod tests {
233+
#[test]
234+
fn test_v5_v6_with_log_source() {
235+
let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"OtelLogs"});
236+
let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"OtelLogs","fields":[]}]});
237+
let updated_stream_metadata = super::v5_v6(stream_metadata.clone());
238+
assert_eq!(updated_stream_metadata, expected);
239+
}
240+
241+
#[test]
242+
fn test_v5_v6_with_default_log_source() {
243+
let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"Json"});
244+
let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]});
245+
let updated_stream_metadata = super::v5_v6(stream_metadata.clone());
246+
assert_eq!(updated_stream_metadata, expected);
247+
}
248+
249+
#[test]
250+
fn test_v5_v6_without_log_source() {
251+
let stream_metadata = serde_json::json!({"version":"v4","schema_version":"v0","objectstore-format":"v4","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined"});
252+
let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]});
253+
let updated_stream_metadata = super::v5_v6(stream_metadata.clone());
254+
assert_eq!(updated_stream_metadata, expected);
255+
}
256+
}

src/parseable/streams.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,8 +685,20 @@ impl Stream {
685685
}
686686

687687
pub fn add_log_source(&self, log_source: LogSourceEntry) {
688+
let metadata = self.metadata.read().expect(LOCK_EXPECT);
689+
for existing in &metadata.log_source {
690+
if existing.log_source_format == log_source.log_source_format {
691+
drop(metadata);
692+
self.add_fields_to_log_source(
693+
&log_source.log_source_format,
694+
log_source.fields.clone(),
695+
);
696+
return;
697+
}
698+
}
699+
drop(metadata);
700+
688701
let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
689-
// Check if this log source format already exists
690702
for existing in &metadata.log_source {
691703
if existing.log_source_format == log_source.log_source_format {
692704
self.add_fields_to_log_source(

0 commit comments

Comments
 (0)