Skip to content

Commit 317ce7a

Browse files
committed
chain/ethereum, graph: refactor firehose block stream to cache firehose blocks
1 parent e44444f commit 317ce7a

File tree

9 files changed

+82
-19
lines changed

9 files changed

+82
-19
lines changed

chain/arweave/src/chain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl Blockchain for Chain {
144144
});
145145

146146
Ok(Box::new(FirehoseBlockStream::new(
147+
self.chain_store(),
147148
deployment.hash,
148149
self.chain_client(),
149150
store.block_ptr(),

chain/cosmos/src/chain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ impl Blockchain for Chain {
137137
});
138138

139139
Ok(Box::new(FirehoseBlockStream::new(
140+
self.chain_store(),
140141
deployment.hash,
141142
self.chain_client(),
142143
store.block_ptr(),

chain/ethereum/src/chain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
100100
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });
101101

102102
Ok(Box::new(FirehoseBlockStream::new(
103+
chain.chain_store(),
103104
deployment.hash,
104105
chain.chain_client(),
105106
subgraph_current_block,

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1719,10 +1719,16 @@ impl EthereumAdapterTrait for EthereumAdapter {
17191719

17201720
let mut blocks: Vec<Arc<BlockPtrExt>> = blocks_map
17211721
.into_iter()
1722-
.filter_map(|(_number, values)| {
1722+
.filter_map(|(number, values)| {
17231723
if values.len() == 1 {
17241724
Arc::new(values[0].clone()).into()
17251725
} else {
1726+
warn!(
1727+
&logger,
1728+
"Expected one block for block number {:?}, found {}",
1729+
number,
1730+
values.len()
1731+
);
17261732
None
17271733
}
17281734
})
@@ -1739,6 +1745,8 @@ impl EthereumAdapterTrait for EthereumAdapter {
17391745
"Loading {} block(s) not in the block cache",
17401746
missing_blocks.len()
17411747
);
1748+
1749+
debug!(logger, "Missing blocks {:?}", missing_blocks);
17421750
}
17431751

17441752
Box::new(

chain/near/src/chain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
136136
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });
137137

138138
Ok(Box::new(FirehoseBlockStream::new(
139+
chain.chain_store(),
139140
deployment.hash,
140141
chain.chain_client(),
141142
subgraph_current_block,

chain/starknet/src/chain.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ impl BlockStreamBuilder<Chain> for StarknetStreamBuilder {
227227
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });
228228

229229
Ok(Box::new(FirehoseBlockStream::new(
230+
chain.chain_store(),
230231
deployment.hash,
231232
chain.chain_client(),
232233
subgraph_current_block,

graph/src/blockchain/firehose_block_stream.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use super::block_stream::{
44
use super::client::ChainClient;
55
use super::Blockchain;
66
use crate::blockchain::block_stream::FirehoseCursor;
7-
use crate::blockchain::TriggerFilter;
7+
use crate::blockchain::{Block, TriggerFilter};
88
use crate::prelude::*;
99
use crate::util::backoff::ExponentialBackoff;
1010
use crate::{firehose, firehose::FirehoseEndpoint};
@@ -108,6 +108,7 @@ where
108108
C: Blockchain,
109109
{
110110
pub fn new<F>(
111+
chain_store: Arc<dyn ChainStore>,
111112
deployment: DeploymentHash,
112113
client: Arc<ChainClient<C>>,
113114
subgraph_current_block: Option<BlockPtr>,
@@ -134,6 +135,7 @@ where
134135
let metrics = FirehoseBlockStreamMetrics::new(registry, deployment.clone());
135136
FirehoseBlockStream {
136137
stream: Box::pin(stream_blocks(
138+
chain_store,
137139
client,
138140
cursor,
139141
deployment,
@@ -148,6 +150,7 @@ where
148150
}
149151

150152
fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
153+
chain_store: Arc<dyn ChainStore>,
151154
client: Arc<ChainClient<C>>,
152155
mut latest_cursor: FirehoseCursor,
153156
deployment: DeploymentHash,
@@ -257,6 +260,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
257260

258261
for await response in stream {
259262
match process_firehose_response(
263+
chain_store.clone(),
260264
&endpoint,
261265
response,
262266
&mut check_subgraph_continuity,
@@ -344,6 +348,7 @@ enum BlockResponse<C: Blockchain> {
344348
}
345349

346350
async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
351+
chain_store: Arc<dyn ChainStore>,
347352
endpoint: &Arc<FirehoseEndpoint>,
348353
result: Result<firehose::Response, Status>,
349354
check_subgraph_continuity: &mut bool,
@@ -359,11 +364,46 @@ async fn process_firehose_response<C: Blockchain, F: FirehoseMapper<C>>(
359364
.await
360365
.context("Mapping block to BlockStreamEvent failed")?;
361366

367+
if let BlockStreamEvent::ProcessBlock(block, _) = &event {
368+
info!(logger, "Inserting block to cache"; "block_number" => block.block.number(), "block_hash" => format!("{:?}", block.block.hash()));
369+
370+
let start_time = Instant::now();
371+
372+
let result = chain_store
373+
.insert_block(Arc::new(block.block.clone()))
374+
.await;
375+
376+
let elapsed = start_time.elapsed();
377+
378+
match result {
379+
Ok(_) => {
380+
trace!(
381+
logger,
382+
"Block inserted to cache successfully";
383+
"block_number" => block.block.number(),
384+
"block_hash" => format!("{:?}", block.block.hash()),
385+
"time_taken" => format!("{:?}", elapsed)
386+
);
387+
}
388+
Err(e) => {
389+
error!(
390+
logger,
391+
"Failed to insert block into store";
392+
"block_number" => block.block.number(),
393+
"block_hash" => format!("{:?}", block.block.hash()),
394+
"error" => format!("{:?}", e),
395+
"time_taken" => format!("{:?}", elapsed)
396+
);
397+
}
398+
}
399+
}
400+
362401
if *check_subgraph_continuity {
363402
info!(logger, "Firehose started from a subgraph pointer without an existing cursor, ensuring chain continuity");
364403

365404
if let BlockStreamEvent::ProcessBlock(ref block, _) = event {
366405
let previous_block_ptr = block.parent_ptr();
406+
367407
if previous_block_ptr.is_some() && previous_block_ptr.as_ref() != subgraph_current_block
368408
{
369409
warn!(&logger,

graph/src/components/store/traits.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,8 @@ pub trait ChainStore: Send + Sync + 'static {
445445
/// Insert a block into the store (or update if they are already present).
446446
async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error>;
447447

448+
async fn insert_block(&self, block: Arc<dyn Block>) -> Result<(), Error>;
449+
448450
fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error>;
449451

450452
/// Try to update the head block pointer to the block with the highest block number.

store/postgres/src/chain_store.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,6 +1956,27 @@ impl ChainStore {
19561956

19571957
Ok(block_map)
19581958
}
1959+
1960+
async fn save_block(&self, block: Arc<dyn Block>, allow_update: bool) -> Result<(), Error> {
1961+
// We should always have the parent block available to us at this point.
1962+
if let Some(parent_hash) = block.parent_hash() {
1963+
let block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok());
1964+
self.recent_blocks_cache.insert_block(block);
1965+
}
1966+
1967+
let pool = self.pool.clone();
1968+
let network = self.chain.clone();
1969+
let storage = self.storage.clone();
1970+
pool.with_conn(move |conn, _| {
1971+
conn.transaction(|conn| {
1972+
storage
1973+
.upsert_block(conn, &network, block.as_ref(), allow_update)
1974+
.map_err(CancelableError::from)
1975+
})
1976+
})
1977+
.await
1978+
.map_err(Error::from)
1979+
}
19591980
}
19601981

19611982
fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result<BlockPtrExt, Error> {
@@ -1984,24 +2005,11 @@ impl ChainStoreTrait for ChainStore {
19842005
}
19852006

19862007
async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
1987-
// We should always have the parent block available to us at this point.
1988-
if let Some(parent_hash) = block.parent_hash() {
1989-
let block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok());
1990-
self.recent_blocks_cache.insert_block(block);
1991-
}
2008+
self.save_block(block, true).await
2009+
}
19922010

1993-
let pool = self.pool.clone();
1994-
let network = self.chain.clone();
1995-
let storage = self.storage.clone();
1996-
pool.with_conn(move |conn, _| {
1997-
conn.transaction(|conn| {
1998-
storage
1999-
.upsert_block(conn, &network, block.as_ref(), true)
2000-
.map_err(CancelableError::from)
2001-
})
2002-
})
2003-
.await
2004-
.map_err(Error::from)
2011+
async fn insert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
2012+
self.save_block(block, false).await
20052013
}
20062014

20072015
fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error> {

0 commit comments

Comments
 (0)