@@ -790,6 +790,32 @@ async fn fetch_unique_blocks_from_cache(
790790 ( blocks, missing_blocks)
791791}
792792
793+ /// Fetches blocks by their numbers, first attempting to load from cache.
794+ /// Missing blocks are retrieved from an external source, with all blocks sorted and converted to `BlockFinality` format.
795+ async fn load_blocks < F , Fut > (
796+ logger : & Logger ,
797+ chain_store : Arc < dyn ChainStore > ,
798+ block_numbers : HashSet < BlockNumber > ,
799+ fetch_missing : F ,
800+ ) -> Result < Vec < BlockFinality > >
801+ where
802+ F : FnOnce ( Vec < BlockNumber > ) -> Fut ,
803+ Fut : Future < Output = Result < Vec < Arc < ExtendedBlockPtr > > > > ,
804+ {
805+ // Fetch cached blocks and identify missing ones
806+ let ( mut cached_blocks, missing_block_numbers) =
807+ fetch_unique_blocks_from_cache ( logger, chain_store, block_numbers) . await ;
808+
809+ // Fetch missing blocks if any
810+ if !missing_block_numbers. is_empty ( ) {
811+ let missing_blocks = fetch_missing ( missing_block_numbers) . await ?;
812+ cached_blocks. extend ( missing_blocks) ;
813+ cached_blocks. sort_by_key ( |block| block. number ) ;
814+ }
815+
816+ Ok ( cached_blocks. into_iter ( ) . map ( BlockFinality :: Ptr ) . collect ( ) )
817+ }
818+
793819#[ async_trait]
794820impl TriggersAdapterTrait < Chain > for TriggersAdapter {
795821 async fn scan_triggers (
@@ -819,32 +845,6 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
819845 logger : Logger ,
820846 block_numbers : HashSet < BlockNumber > ,
821847 ) -> Result < Vec < BlockFinality > > {
822- // Common function to handle block loading, regardless of source
823- async fn load_blocks < F , Fut > (
824- logger : & Logger ,
825- chain_store : Arc < dyn ChainStore > ,
826- block_numbers : HashSet < BlockNumber > ,
827- fetch_missing : F ,
828- ) -> Result < Vec < BlockFinality > >
829- where
830- F : FnOnce ( Vec < BlockNumber > ) -> Fut ,
831- Fut : Future < Output = Result < Vec < Arc < ExtendedBlockPtr > > > > ,
832- {
833- // Fetch cached blocks and identify missing ones
834- let ( mut cached_blocks, missing_block_numbers) =
835- fetch_unique_blocks_from_cache ( logger, chain_store, block_numbers) . await ;
836-
837- // Fetch missing blocks if any
838- if !missing_block_numbers. is_empty ( ) {
839- let missing_blocks = fetch_missing ( missing_block_numbers) . await ?;
840- cached_blocks. extend ( missing_blocks) ;
841- cached_blocks. sort_by_key ( |block| block. number ) ;
842- }
843-
844- // Convert to BlockFinality
845- Ok ( cached_blocks. into_iter ( ) . map ( BlockFinality :: Ptr ) . collect ( ) )
846- }
847-
848848 match & * self . chain_client {
849849 ChainClient :: Firehose ( endpoints) => {
850850 trace ! (
@@ -1193,3 +1193,138 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
11931193 . await
11941194 }
11951195}
1196+
1197+ #[ cfg( test) ]
1198+ mod tests {
1199+ use graph:: blockchain:: mock:: MockChainStore ;
1200+ use graph:: { slog, tokio} ;
1201+
1202+ use super :: * ;
1203+ use std:: collections:: HashSet ;
1204+ use std:: sync:: Arc ;
1205+
1206+ // Helper function to create test blocks
1207+ fn create_test_block ( number : BlockNumber , hash : & str ) -> ExtendedBlockPtr {
1208+ let hash = BlockHash ( hash. as_bytes ( ) . to_vec ( ) . into_boxed_slice ( ) ) ;
1209+ let ptr = BlockPtr :: new ( hash. clone ( ) , number) ;
1210+ ExtendedBlockPtr {
1211+ hash,
1212+ number,
1213+ parent_hash : BlockHash ( vec ! [ 0 ; 32 ] . into_boxed_slice ( ) ) ,
1214+ timestamp : BlockTime :: for_test ( & ptr) ,
1215+ }
1216+ }
1217+
1218+ #[ tokio:: test]
1219+ async fn test_fetch_unique_blocks_single_block ( ) {
1220+ let logger = Logger :: root ( slog:: Discard , o ! ( ) ) ;
1221+ let mut chain_store = MockChainStore :: default ( ) ;
1222+
1223+ // Add a single block
1224+ let block = create_test_block ( 1 , "block1" ) ;
1225+ chain_store. blocks . insert ( 1 , vec ! [ block. clone( ) ] ) ;
1226+
1227+ let block_numbers: HashSet < _ > = vec ! [ 1 ] . into_iter ( ) . collect ( ) ;
1228+
1229+ let ( blocks, missing) =
1230+ fetch_unique_blocks_from_cache ( & logger, Arc :: new ( chain_store) , block_numbers) . await ;
1231+
1232+ assert_eq ! ( blocks. len( ) , 1 ) ;
1233+ assert_eq ! ( blocks[ 0 ] . number, 1 ) ;
1234+ assert ! ( missing. is_empty( ) ) ;
1235+ }
1236+
1237+ #[ tokio:: test]
1238+ async fn test_fetch_unique_blocks_duplicate_blocks ( ) {
1239+ let logger = Logger :: root ( slog:: Discard , o ! ( ) ) ;
1240+ let mut chain_store = MockChainStore :: default ( ) ;
1241+
1242+ // Add multiple blocks for the same number
1243+ let block1 = create_test_block ( 1 , "block1a" ) ;
1244+ let block2 = create_test_block ( 1 , "block1b" ) ;
1245+ chain_store
1246+ . blocks
1247+ . insert ( 1 , vec ! [ block1. clone( ) , block2. clone( ) ] ) ;
1248+
1249+ let block_numbers: HashSet < _ > = vec ! [ 1 ] . into_iter ( ) . collect ( ) ;
1250+
1251+ let ( blocks, missing) =
1252+ fetch_unique_blocks_from_cache ( & logger, Arc :: new ( chain_store) , block_numbers) . await ;
1253+
1254+ // Should filter out the duplicate block
1255+ assert ! ( blocks. is_empty( ) ) ;
1256+ assert_eq ! ( missing, vec![ 1 ] ) ;
1257+ assert_eq ! ( missing[ 0 ] , 1 ) ;
1258+ }
1259+
1260+ #[ tokio:: test]
1261+ async fn test_fetch_unique_blocks_missing_blocks ( ) {
1262+ let logger = Logger :: root ( slog:: Discard , o ! ( ) ) ;
1263+ let mut chain_store = MockChainStore :: default ( ) ;
1264+
1265+ // Add block number 1 but not 2
1266+ let block = create_test_block ( 1 , "block1" ) ;
1267+ chain_store. blocks . insert ( 1 , vec ! [ block. clone( ) ] ) ;
1268+
1269+ let block_numbers: HashSet < _ > = vec ! [ 1 , 2 ] . into_iter ( ) . collect ( ) ;
1270+
1271+ let ( blocks, missing) =
1272+ fetch_unique_blocks_from_cache ( & logger, Arc :: new ( chain_store) , block_numbers) . await ;
1273+
1274+ assert_eq ! ( blocks. len( ) , 1 ) ;
1275+ assert_eq ! ( blocks[ 0 ] . number, 1 ) ;
1276+ assert_eq ! ( missing, vec![ 2 ] ) ;
1277+ }
1278+
1279+ #[ tokio:: test]
1280+ async fn test_fetch_unique_blocks_multiple_valid_blocks ( ) {
1281+ let logger = Logger :: root ( slog:: Discard , o ! ( ) ) ;
1282+ let mut chain_store = MockChainStore :: default ( ) ;
1283+
1284+ // Add multiple valid blocks
1285+ let block1 = create_test_block ( 1 , "block1" ) ;
1286+ let block2 = create_test_block ( 2 , "block2" ) ;
1287+ chain_store. blocks . insert ( 1 , vec ! [ block1. clone( ) ] ) ;
1288+ chain_store. blocks . insert ( 2 , vec ! [ block2. clone( ) ] ) ;
1289+
1290+ let block_numbers: HashSet < _ > = vec ! [ 1 , 2 ] . into_iter ( ) . collect ( ) ;
1291+
1292+ let ( blocks, missing) =
1293+ fetch_unique_blocks_from_cache ( & logger, Arc :: new ( chain_store) , block_numbers) . await ;
1294+
1295+ assert_eq ! ( blocks. len( ) , 2 ) ;
1296+ assert ! ( blocks. iter( ) . any( |b| b. number == 1 ) ) ;
1297+ assert ! ( blocks. iter( ) . any( |b| b. number == 2 ) ) ;
1298+ assert ! ( missing. is_empty( ) ) ;
1299+ }
1300+
1301+ #[ tokio:: test]
1302+ async fn test_fetch_unique_blocks_mixed_scenario ( ) {
1303+ let logger = Logger :: root ( slog:: Discard , o ! ( ) ) ;
1304+ let mut chain_store = MockChainStore :: default ( ) ;
1305+
1306+ // Add a mix of scenarios:
1307+ // - Block 1: Single valid block
1308+ // - Block 2: Multiple blocks (duplicate)
1309+ // - Block 3: Missing
1310+ let block1 = create_test_block ( 1 , "block1" ) ;
1311+ let block2a = create_test_block ( 2 , "block2a" ) ;
1312+ let block2b = create_test_block ( 2 , "block2b" ) ;
1313+
1314+ chain_store. blocks . insert ( 1 , vec ! [ block1. clone( ) ] ) ;
1315+ chain_store
1316+ . blocks
1317+ . insert ( 2 , vec ! [ block2a. clone( ) , block2b. clone( ) ] ) ;
1318+
1319+ let block_numbers: HashSet < _ > = vec ! [ 1 , 2 , 3 ] . into_iter ( ) . collect ( ) ;
1320+
1321+ let ( blocks, missing) =
1322+ fetch_unique_blocks_from_cache ( & logger, Arc :: new ( chain_store) , block_numbers) . await ;
1323+
1324+ assert_eq ! ( blocks. len( ) , 1 ) ;
1325+ assert_eq ! ( blocks[ 0 ] . number, 1 ) ;
1326+ assert_eq ! ( missing. len( ) , 2 ) ;
1327+ assert ! ( missing. contains( & 2 ) ) ;
1328+ assert ! ( missing. contains( & 3 ) ) ;
1329+ }
1330+ }
0 commit comments