@@ -106,7 +106,6 @@ fn get_file_bounds(
106106}
107107
108108pub async fn update_snapshot (
109- storage : Arc < dyn ObjectStorage > ,
110109 stream_name : & str ,
111110 changes : Vec < manifest:: File > ,
112111) -> Result < ( ) , ObjectStorageError > {
@@ -124,9 +123,9 @@ pub async fn update_snapshot(
124123 let partition_groups = group_changes_by_partition ( changes, & meta. time_partition ) ;
125124
126125 let new_manifest_entries =
127- process_partition_groups ( partition_groups, & mut meta, storage . clone ( ) , stream_name) . await ?;
126+ process_partition_groups ( partition_groups, & mut meta, stream_name) . await ?;
128127
129- finalize_snapshot_update ( meta, new_manifest_entries, storage , stream_name) . await
128+ finalize_snapshot_update ( meta, new_manifest_entries, stream_name) . await
130129}
131130
132131/// Groups manifest file changes by time partitions using Rayon for parallel processing
@@ -214,7 +213,6 @@ fn extract_partition_metrics(stream_name: &str, partition_lower: DateTime<Utc>)
214213async fn process_partition_groups (
215214 partition_groups : HashMap < ( DateTime < Utc > , DateTime < Utc > ) , Vec < manifest:: File > > ,
216215 meta : & mut ObjectStoreFormat ,
217- storage : Arc < dyn ObjectStorage > ,
218216 stream_name : & str ,
219217) -> Result < Vec < snapshot:: ManifestItem > , ObjectStorageError > {
220218 let mut new_manifest_entries = Vec :: new ( ) ;
@@ -227,7 +225,6 @@ async fn process_partition_groups(
227225 partition_lower,
228226 partition_changes,
229227 meta,
230- storage. clone ( ) ,
231228 stream_name,
232229 events_ingested,
233230 ingestion_size,
@@ -249,7 +246,6 @@ async fn process_single_partition(
249246 partition_lower : DateTime < Utc > ,
250247 partition_changes : Vec < manifest:: File > ,
251248 meta : & mut ObjectStoreFormat ,
252- storage : Arc < dyn ObjectStorage > ,
253249 stream_name : & str ,
254250 events_ingested : u64 ,
255251 ingestion_size : u64 ,
@@ -263,7 +259,6 @@ async fn process_single_partition(
263259 handle_existing_partition (
264260 pos,
265261 partition_changes,
266- storage,
267262 stream_name,
268263 meta,
269264 events_ingested,
@@ -277,7 +272,6 @@ async fn process_single_partition(
277272 create_manifest (
278273 partition_lower,
279274 partition_changes,
280- storage,
281275 stream_name,
282276 false ,
283277 meta. clone ( ) ,
@@ -294,7 +288,6 @@ async fn process_single_partition(
294288async fn handle_existing_partition (
295289 pos : usize ,
296290 partition_changes : Vec < manifest:: File > ,
297- storage : Arc < dyn ObjectStorage > ,
298291 stream_name : & str ,
299292 meta : & mut ObjectStoreFormat ,
300293 events_ingested : u64 ,
@@ -303,22 +296,35 @@ async fn handle_existing_partition(
303296 partition_lower : DateTime < Utc > ,
304297) -> Result < Option < snapshot:: ManifestItem > , ObjectStorageError > {
305298 let manifests = & mut meta. snapshot . manifest_list ;
306- let path = partition_path (
307- stream_name,
308- manifests[ pos] . time_lower_bound ,
309- manifests[ pos] . time_upper_bound ,
310- ) ;
311299
312300 let manifest_file_name = manifest_path ( "" ) . to_string ( ) ;
313301 let should_update = manifests[ pos] . manifest_path . contains ( & manifest_file_name) ;
314302
315303 if should_update {
316- if let Some ( mut manifest) = storage. get_manifest ( & path) . await ? {
304+ if let Some ( mut manifest) = PARSEABLE
305+ . metastore
306+ . get_manifest (
307+ stream_name,
308+ manifests[ pos] . time_lower_bound ,
309+ manifests[ pos] . time_upper_bound ,
310+ )
311+ . await
312+ . map_err ( |e| ObjectStorageError :: MetastoreError ( Box :: new ( e. to_detail ( ) ) ) ) ?
313+ {
317314 // Update existing manifest
318315 for change in partition_changes {
319316 manifest. apply_change ( change) ;
320317 }
321- storage. put_manifest ( & path, manifest) . await ?;
318+ PARSEABLE
319+ . metastore
320+ . put_manifest (
321+ & manifest,
322+ stream_name,
323+ manifests[ pos] . time_lower_bound ,
324+ manifests[ pos] . time_upper_bound ,
325+ )
326+ . await
327+ . map_err ( |e| ObjectStorageError :: MetastoreError ( Box :: new ( e. to_detail ( ) ) ) ) ?;
322328
323329 manifests[ pos] . events_ingested = events_ingested;
324330 manifests[ pos] . ingestion_size = ingestion_size;
@@ -329,7 +335,6 @@ async fn handle_existing_partition(
329335 create_manifest (
330336 partition_lower,
331337 partition_changes,
332- storage,
333338 stream_name,
334339 false ,
335340 meta. clone ( ) ,
@@ -344,7 +349,6 @@ async fn handle_existing_partition(
344349 create_manifest (
345350 partition_lower,
346351 partition_changes,
347- storage,
348352 stream_name,
349353 false ,
350354 ObjectStoreFormat :: default ( ) ,
@@ -360,7 +364,6 @@ async fn handle_existing_partition(
360364async fn finalize_snapshot_update (
361365 mut meta : ObjectStoreFormat ,
362366 new_manifest_entries : Vec < snapshot:: ManifestItem > ,
363- storage : Arc < dyn ObjectStorage > ,
364367 stream_name : & str ,
365368) -> Result < ( ) , ObjectStorageError > {
366369 // Add all new manifest entries to the snapshot
@@ -370,15 +373,18 @@ async fn finalize_snapshot_update(
370373 if let Some ( stats) = stats {
371374 meta. stats = stats;
372375 }
373- storage. put_stream_manifest ( stream_name, & meta) . await ?;
376+ PARSEABLE
377+ . metastore
378+ . put_stream_json ( & meta, stream_name)
379+ . await
380+ . map_err ( |e| ObjectStorageError :: MetastoreError ( Box :: new ( e. to_detail ( ) ) ) ) ?;
374381 Ok ( ( ) )
375382}
376383
377384#[ allow( clippy:: too_many_arguments) ]
378385async fn create_manifest (
379386 lower_bound : DateTime < Utc > ,
380387 changes : Vec < manifest:: File > ,
381- storage : Arc < dyn ObjectStorage > ,
382388 stream_name : & str ,
383389 update_snapshot : bool ,
384390 mut meta : ObjectStoreFormat ,
@@ -424,15 +430,19 @@ async fn create_manifest(
424430 }
425431 }
426432
427- let manifest_file_name = manifest_path ( "" ) . to_string ( ) ;
428- let path = partition_path ( stream_name , lower_bound , upper_bound ) . join ( & manifest_file_name ) ;
429- storage
430- . put_object ( & path , serde_json :: to_vec ( & manifest ) ? . into ( ) )
431- . await ?;
433+ PARSEABLE
434+ . metastore
435+ . put_manifest ( & manifest , stream_name , lower_bound , upper_bound )
436+ . await
437+ . map_err ( |e| ObjectStorageError :: MetastoreError ( Box :: new ( e . to_detail ( ) ) ) ) ?;
432438
433- let path_url = storage. absolute_url ( & path) ;
439+ let path_url = & PARSEABLE
440+ . metastore
441+ . get_manifest_path ( stream_name, lower_bound, upper_bound)
442+ . await
443+ . map_err ( |e| ObjectStorageError :: MetastoreError ( Box :: new ( e. to_detail ( ) ) ) ) ?;
434444 let new_snapshot_entry = snapshot:: ManifestItem {
435- manifest_path : path_url. to_string ( ) ,
445+ manifest_path : path_url. to_owned ( ) ,
436446 time_lower_bound : lower_bound,
437447 time_upper_bound : upper_bound,
438448 events_ingested,
@@ -449,7 +459,13 @@ async fn create_manifest(
449459 meta. stats = stats;
450460 }
451461 meta. first_event_at = first_event_at;
452- storage. put_stream_manifest ( stream_name, & meta) . await ?;
462+
463+ PARSEABLE
464+ . metastore
465+ . put_stream_json ( & meta, stream_name)
466+ . await
467+ . map_err ( |e| ObjectStorageError :: MetastoreError ( Box :: new ( e. to_detail ( ) ) ) ) ?;
468+
453469 Ok ( None )
454470 } else {
455471 Ok ( Some ( new_snapshot_entry) )
0 commit comments