Skip to content

Commit cb883df

Browse files
update for distributed
1 parent c7923b0 commit cb883df

File tree

3 files changed

+62
-0
lines changed

3 files changed

+62
-0
lines changed

src/handlers/http/logstream.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,14 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
309309
None
310310
};
311311

312+
let stream_log_source = storage
313+
.get_log_source_from_storage(&stream_name)
314+
.await
315+
.unwrap_or_default();
316+
PARSEABLE
317+
.update_log_source(&stream_name, stream_log_source)
318+
.await?;
319+
312320
let hash_map = PARSEABLE.streams.read().unwrap();
313321
let stream_meta = hash_map
314322
.get(&stream_name)

src/parseable/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,25 @@ impl Parseable {
822822

823823
Some(first_event_at.to_string())
824824
}
825+
826+
pub async fn update_log_source(
827+
&self,
828+
stream_name: &str,
829+
log_source: Vec<LogSourceEntry>,
830+
) -> Result<(), StreamError> {
831+
let storage = self.storage.get_object_store();
832+
if let Err(err) = storage
833+
.update_log_source_in_stream(stream_name, &log_source)
834+
.await
835+
{
836+
return Err(StreamError::Storage(err));
837+
}
838+
839+
let stream = self.get_stream(stream_name).expect(STREAM_EXISTS);
840+
stream.set_log_source(log_source);
841+
842+
Ok(())
843+
}
825844
}
826845

827846
pub fn validate_static_schema(

src/storage/object_storage.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,41 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
687687
Ok(stream_metas)
688688
}
689689

690+
async fn get_log_source_from_storage(
691+
&self,
692+
stream_name: &str,
693+
) -> Result<Vec<LogSourceEntry>, ObjectStorageError> {
694+
let mut all_log_sources: Vec<LogSourceEntry> = Vec::new();
695+
let stream_metas = self.get_stream_meta_from_storage(stream_name).await;
696+
if let Ok(stream_metas) = stream_metas {
697+
for stream_meta in stream_metas.iter() {
698+
// fetch unique log sources and their fields
699+
all_log_sources.extend(stream_meta.log_source.clone());
700+
}
701+
}
702+
703+
//merge fields of same log source
704+
let mut merged_log_sources: Vec<LogSourceEntry> = Vec::new();
705+
let mut log_source_map: HashMap<LogSource, HashSet<String>> = HashMap::new();
706+
for log_source_entry in all_log_sources {
707+
let log_source_format = log_source_entry.log_source_format;
708+
let fields = log_source_entry.fields;
709+
710+
log_source_map
711+
.entry(log_source_format)
712+
.or_default()
713+
.extend(fields);
714+
}
715+
716+
for (log_source_format, fields) in log_source_map {
717+
merged_log_sources.push(LogSourceEntry {
718+
log_source_format,
719+
fields: fields.into_iter().collect(),
720+
});
721+
}
722+
Ok(merged_log_sources)
723+
}
724+
690725
/// Retrieves the earliest first-event-at from the storage for the specified stream.
691726
///
692727
/// This function fetches the object-store format from all the stream.json files for the given stream from the storage,

0 commit comments

Comments
 (0)