@@ -4,7 +4,7 @@ use super::block_stream::{
44use super :: client:: ChainClient ;
55use super :: Blockchain ;
66use crate :: blockchain:: block_stream:: FirehoseCursor ;
7- use crate :: blockchain:: TriggerFilter ;
7+ use crate :: blockchain:: { Block , TriggerFilter } ;
88use crate :: prelude:: * ;
99use crate :: util:: backoff:: ExponentialBackoff ;
1010use crate :: { firehose, firehose:: FirehoseEndpoint } ;
@@ -108,6 +108,7 @@ where
108108 C : Blockchain ,
109109{
110110 pub fn new < F > (
111+ chain_store : Arc < dyn ChainStore > ,
111112 deployment : DeploymentHash ,
112113 client : Arc < ChainClient < C > > ,
113114 subgraph_current_block : Option < BlockPtr > ,
@@ -134,6 +135,7 @@ where
134135 let metrics = FirehoseBlockStreamMetrics :: new ( registry, deployment. clone ( ) ) ;
135136 FirehoseBlockStream {
136137 stream : Box :: pin ( stream_blocks (
138+ chain_store,
137139 client,
138140 cursor,
139141 deployment,
@@ -148,6 +150,7 @@ where
148150}
149151
150152fn stream_blocks < C : Blockchain , F : FirehoseMapper < C > > (
153+ chain_store : Arc < dyn ChainStore > ,
151154 client : Arc < ChainClient < C > > ,
152155 mut latest_cursor : FirehoseCursor ,
153156 deployment : DeploymentHash ,
@@ -257,6 +260,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
257260
258261 for await response in stream {
259262 match process_firehose_response(
263+ chain_store. clone( ) ,
260264 & endpoint,
261265 response,
262266 & mut check_subgraph_continuity,
@@ -344,6 +348,7 @@ enum BlockResponse<C: Blockchain> {
344348}
345349
346350async fn process_firehose_response < C : Blockchain , F : FirehoseMapper < C > > (
351+ chain_store : Arc < dyn ChainStore > ,
347352 endpoint : & Arc < FirehoseEndpoint > ,
348353 result : Result < firehose:: Response , Status > ,
349354 check_subgraph_continuity : & mut bool ,
@@ -359,11 +364,46 @@ async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
359364 . await
360365 . context ( "Mapping block to BlockStreamEvent failed" ) ?;
361366
367+ if let BlockStreamEvent :: ProcessBlock ( block, _) = & event {
368+ info ! ( logger, "Inserting block to cache" ; "block_number" => block. block. number( ) , "block_hash" => format!( "{:?}" , block. block. hash( ) ) ) ;
369+
370+ let start_time = Instant :: now ( ) ;
371+
372+ let result = chain_store
373+ . insert_block ( Arc :: new ( block. block . clone ( ) ) )
374+ . await ;
375+
376+ let elapsed = start_time. elapsed ( ) ;
377+
378+ match result {
379+ Ok ( _) => {
380+ trace ! (
381+ logger,
382+ "Block inserted to cache successfully" ;
383+ "block_number" => block. block. number( ) ,
384+ "block_hash" => format!( "{:?}" , block. block. hash( ) ) ,
385+ "time_taken" => format!( "{:?}" , elapsed)
386+ ) ;
387+ }
388+ Err ( e) => {
389+ error ! (
390+ logger,
391+ "Failed to insert block into store" ;
392+ "block_number" => block. block. number( ) ,
393+ "block_hash" => format!( "{:?}" , block. block. hash( ) ) ,
394+ "error" => format!( "{:?}" , e) ,
395+ "time_taken" => format!( "{:?}" , elapsed)
396+ ) ;
397+ }
398+ }
399+ }
400+
362401 if * check_subgraph_continuity {
363402 info ! ( logger, "Firehose started from a subgraph pointer without an existing cursor, ensuring chain continuity" ) ;
364403
365404 if let BlockStreamEvent :: ProcessBlock ( ref block, _) = event {
366405 let previous_block_ptr = block. parent_ptr ( ) ;
406+
367407 if previous_block_ptr. is_some ( ) && previous_block_ptr. as_ref ( ) != subgraph_current_block
368408 {
369409 warn ! ( & logger,
0 commit comments