diff --git a/packages/tracker/src/services/block/block.service.ts b/packages/tracker/src/services/block/block.service.ts index 0854c137..ecdd5d48 100644 --- a/packages/tracker/src/services/block/block.service.ts +++ b/packages/tracker/src/services/block/block.service.ts @@ -9,76 +9,48 @@ import { Block } from 'bitcoinjs-lib'; import { TxService } from '../tx/tx.service'; import { BlockHeader } from '../../common/types'; import { Constants } from '../../common/constants'; -import { CommonService } from '../common/common.service'; @Injectable() export class BlockService implements OnModuleInit { private readonly logger = new Logger(BlockService.name); - private readonly genesisBlockHeight: number; constructor( - private dataSource: DataSource, - private readonly rpcService: RpcService, - private readonly txService: TxService, - private readonly configService: ConfigService, - private readonly commonService: CommonService, - @InjectRepository(BlockEntity) - private blockEntityRepository: Repository, + private dataSource: DataSource, + private readonly rpcService: RpcService, + private readonly txService: TxService, + private configService: ConfigService, + @InjectRepository(BlockEntity) + private blockEntityRepository: Repository, ) { this.genesisBlockHeight = this.configService.get('genesisBlockHeight'); } async onModuleInit() { - await this.checkRpcConnection(); - await this.checkDatabaseConnection(); - await this.processForceReindex(); - this.daemonProcessBlocks(); - this.logger.log('daemon process blocks initialized'); + this.logger.log('Daemon process blocks initialized'); } - /** - * process reindex from a block height - */ private async processForceReindex() { - if (process.env.REINDEX_BLOCK_HEIGHT !== undefined) { - const reindexHeight = Math.max( - parseInt(process.env.REINDEX_BLOCK_HEIGHT), - this.genesisBlockHeight, - ); + const reindexHeight = this.getReindexHeight(); + if (reindexHeight !== null) { await this.deleteBlocks(reindexHeight); - this.logger.log(`reindex from height ${reindexHeight}`); + this.logger.log(`Reindexing from height ${reindexHeight}`); } } - private async checkRpcConnection() { - await this.rpcService.getBlockchainInfo(true, true); - this.logger.log('rpc connection established'); - } - - private async checkDatabaseConnection() { - try { - await this.blockEntityRepository.count(); - this.logger.log('database connection established'); - } catch { - throw new Error('database not ready, run `yarn migration:run` first'); - } + private getReindexHeight(): number | null { + const reindexHeight = process.env.REINDEX_BLOCK_HEIGHT; + return reindexHeight !== undefined + ? Math.max(parseInt(reindexHeight), this.genesisBlockHeight) + : null; } - /** - * delete blocks with height greater than or equal to the given height - * @param height the start block height to be deleted - */ private async deleteBlocks(height: number) { await this.dataSource.manager.transaction(async (manager) => { await Promise.all([ - // delete blocks with height greater than or equal to the given height - manager.delete(BlockEntity, { - height: MoreThanOrEqual(height), - }), - // delete related tx in database + manager.delete(BlockEntity, { height: MoreThanOrEqual(height) }), this.txService.deleteTx(manager, height), ]); }); @@ -89,141 +61,181 @@ export class BlockService implements OnModuleInit { try { await this.processBlocks(); } catch (e) { - this.logger.error(`daemon process blocks error, ${e.message}`); + this.logger.error(`Daemon process blocks error: ${e.message}`); + await sleep(Constants.BLOCK_PROCESSING_INTERVAL); } } } private async processBlocks() { - // query last processed block in database - const lastProcessedBlock = await this.commonService.getLastProcessedBlock(); - // the potential next height to be processed is the height of last processed block plus one - // or the genesis block height if this is the first time run + const lastProcessedBlock = await this.getLastProcessedBlock(); const nextHeight = lastProcessedBlock - ? lastProcessedBlock.height + 1 - : this.genesisBlockHeight; - // get block hash by height to check the existence of the next block - // if cannot get a result, then there is no new block to process + ? lastProcessedBlock.height + 1 + : this.genesisBlockHeight; + const nextHash = await this.getBlockHash(nextHeight); if (!nextHash) { + this.logger.warn(`No hash found for height ${nextHeight}, retrying in ${Constants.BLOCK_PROCESSING_INTERVAL / 1000} seconds...`); await sleep(Constants.BLOCK_PROCESSING_INTERVAL); return; } - // lastProcessedBlock - // v - // database: [ ] -- [ ] -- [ ] - // \ -- [ ] - // ^ - // nextHash - // nextHeader - // - // lastProcessedBlock - // v - // database: [ ] -- [ ] -- [ ] - // \ - // \ -- [ ] -- [ ] -- [ ] - // ^ ^ - // nextHeader nextHash + const nextHeader = await this.processReorg(nextHash); await this.processBlock(nextHeader); } - /** - * process reorg if needed, and return the header of the right next block to process - * @param nextHash block hash of potential next block - */ private async processReorg(nextHash: string): Promise { let nextHeader: BlockHeader; let hash = nextHash; - // backtrack blocks from a block hash until - // the corresponding block record appears in the database, - // or the genesis block is reached. + while (true) { nextHeader = await this.getBlockHeader(hash); - if (nextHeader.height === this.genesisBlockHeight) { - break; - } - const existed = await this.blockEntityRepository.exists({ + + if (nextHeader.height === this.genesisBlockHeight) break; + + const exists = await this.blockEntityRepository.exists({ where: { hash: nextHeader.previousblockhash }, }); - if (existed) { - break; - } + + if (exists) break; + hash = nextHeader.previousblockhash; } + if (nextHeader.hash !== nextHash) { - // found reorg this.logger.log( - `found reorg, common ancestor #${nextHeader.height - 1} ${nextHeader.previousblockhash}`, + `Reorg detected: common ancestor at height #${nextHeader.height - 1}, ${nextHeader.previousblockhash}`, ); await this.deleteBlocks(nextHeader.height); } + return nextHeader; } private async processBlock(blockHeader: BlockHeader) { const rawBlock = await this.getRawBlock(blockHeader.hash); const block = Block.fromHex(rawBlock); + if (block.transactions.length === 0) { - throw new Error('no txs in block'); + this.logger.warn(`Block #${blockHeader.height} has no transactions`); + return; } - const startTs = Date.now(); - // process all the block txs one by one in order + + const startTime = Date.now(); + let catTxsCount = 0; let catProcessingTime = 0; + for (let i = 0; i < block.transactions.length; i++) { - const ms = await this.txService.processTx( - block.transactions[i], - i, - blockHeader, - ); - if (ms !== undefined) { - catTxsCount += 1; - catProcessingTime += ms; + try { + const processingTime = await this.txService.processTx(block.transactions[i], i, blockHeader); + + if (processingTime !== undefined) { + catTxsCount += 1; + catProcessingTime += processingTime; + } + } catch (e) { + this.logger.error(`Error processing transaction ${i} in block #${blockHeader.height}: ${e.message}`); } } - // save block + await this.blockEntityRepository.save({ ...blockHeader, previousHash: blockHeader.previousblockhash, }); - let _percentage = ''; - const latestBlockHeight = (await this.commonService.getBlockchainInfo()) - ?.headers; - if (latestBlockHeight && latestBlockHeight !== 0) { - _percentage = `[${( - (blockHeader.height / latestBlockHeight) * - 100 - ).toFixed(2)}%] `.padStart(10, ' '); - } - const processingTime = Math.ceil(Date.now() - startTs); - const tps = Math.ceil((block.transactions.length / processingTime) * 1000); + this.logBlockProcessing(blockHeader, block.transactions.length, startTime, catTxsCount, catProcessingTime); + } + + private async logBlockProcessing( + blockHeader: BlockHeader, + txCount: number, + startTime: number, + catTxsCount: number, + catProcessingTime: number + ) { + const latestBlockHeight = (await this.getBlockchainInfo())?.headers; + const percentage = latestBlockHeight + ? `[${((blockHeader.height / latestBlockHeight) * 100).toFixed(2)}%]`.padStart(10, ' ') + : ''; + + const processingTime = Math.ceil(Date.now() - startTime); + const tps = Math.ceil((txCount / processingTime) * 1000); const catTps = Math.ceil((catTxsCount / catProcessingTime) * 1000); - const _txsCount = `${block.transactions.length} txs`.padStart(8, ' '); - const _time = `${processingTime} ms`.padStart(8, ' '); - const _tps = `${tps} tps`.padStart(8, ' '); - const _catTxsCount = `${catTxsCount} txs`.padStart(8, ' '); - const _catTime = `${catProcessingTime} ms`.padStart(8, ' '); - const _catTps = `${catTps} tps`.padStart(8, ' '); this.logger.log( - `${_percentage}processed block #${blockHeader.height} ${blockHeader.hash}, ${_txsCount} ${_time} ${_tps}, ${_catTxsCount} ${_catTime} ${_catTps}`, + `${percentage} ==== Processed block #${blockHeader.height} ${blockHeader.hash}, ` + + `${txCount} txs, ${processingTime} ms, ${tps} tps, ` + + `${catTxsCount} cat txs, ${catProcessingTime} ms, ${catTps} tps`, ); } private async getBlockHash(height: number): Promise { - const resp = await this.rpcService.getBlockHash(height); - return resp?.data?.result; + try { + const resp = await this.rpcService.getBlockHash(height); + return resp?.data?.result; + } catch (error) { + this.logger.error(`Error getting block hash for height ${height}: ${error.message}`); + return undefined; + } } private async getBlockHeader(blockHash: string): Promise { - const resp = await this.rpcService.getBlockHeader(blockHash); - return resp.data.result; + try { + const resp = await this.rpcService.getBlockHeader(blockHash); + return resp.data.result; + } catch (error) { + this.logger.error(`Error getting block header for hash ${blockHash}: ${error.message}`); + throw error; + } } private async getRawBlock(blockHash: string): Promise { - const resp = await this.rpcService.getBlock(blockHash); - return resp.data.result; + try { + const resp = await this.rpcService.getBlock(blockHash); + return resp.data.result; + } catch (error) { + this.logger.error(`Error getting raw block for hash ${blockHash}: ${error.message}`); + throw error; + } } + + public async getLastProcessedBlock(): Promise { + try { + const blocks = await this.blockEntityRepository.find({ + take: 1, + order: { height: 'DESC' }, + }); + return blocks[0] || null; + } catch (error) { + this.logger.error(`Error getting last processed block: ${error.message}`); + throw error; + } + } + + public async getLastProcessedBlockHeight(): Promise { + const block = await this.getLastProcessedBlock(); + return block?.height || null; + } + + public async getBlockchainInfo() { + this.logger.log('Fetching blockchain info...'); + try { + const resp = await this.rpcService.getBlockchainInfo(); + this.logger.log(`Blockchain info fetched: ${JSON.stringify(resp?.data?.result)}`); + return resp?.data?.result; + } catch (error) { + this.logger.error(`Error fetching blockchain info: ${error.message}`); + try { + this.logger.log('Retrying to fetch blockchain info...'); + const resp = await this.rpcService.getBlockchainInfo(); + this.logger.log(`Blockchain info fetched on retry: ${JSON.stringify(resp?.data?.result)}`); + return resp?.data?.result; + } catch (retryError) { + this.logger.error(`Error fetching blockchain info on retry: ${retryError.message}`); + throw retryError; + } + } + } + }