11//! Hermes RTE events inner implementations.
22
3+ use anyhow:: Context ;
34use shared:: {
45 bindings:: hermes:: cardano:: {
56 self ,
67 api:: { Block , SubscriptionId } ,
78 } ,
89 database:: staked_ada:: {
9- create_tables, insert_txi_by_txn_id, insert_txo_assets_by_stake, insert_txo_by_stake,
10+ create_tables, delete_stake_registration_before_slot, delete_stake_registration_since_slot,
11+ delete_txi_before_slot, delete_txi_since_slot, delete_txo_assets_before_slot,
12+ delete_txo_assets_since_slot, delete_txo_before_slot, delete_txo_since_slot,
13+ insert_txi_by_txn_id, insert_txo_assets_by_stake, insert_txo_by_stake,
1014 } ,
1115 utils:: {
12- log:: { error , info, trace} ,
16+ log:: { info, trace} ,
1317 sqlite,
1418 } ,
1519} ;
@@ -25,8 +29,7 @@ pub fn init() -> anyhow::Result<()> {
2529 let mut tx = conn. begin ( ) ?;
2630 create_tables ( & mut tx) ?;
2731 if let Some ( q) = config:: INIT_SQL_QUERY {
28- tx. execute ( q)
29- . inspect_err ( |error| error ! ( error: %; "Failed to execute init sql query" ) ) ?;
32+ tx. execute ( q) . context ( "Failed to execute init sql query" ) ?;
3033 }
3134 tx. commit ( ) ?;
3235 }
@@ -38,17 +41,8 @@ pub fn init() -> anyhow::Result<()> {
3841
3942 let network = cardano:: api:: CardanoNetwork :: Preprod ;
4043
41- let network_resource = cardano:: api:: Network :: new ( network)
42- . inspect_err ( |error| error ! ( error: %, network: ?; "Failed to create network resource" ) ) ?;
43- let subscription_id_resource = network_resource
44- . subscribe_block ( config:: SUBSCRIBE_FROM )
45- . inspect_err ( |error| {
46- error ! (
47- error: %,
48- subscribe_from: ? = config:: SUBSCRIBE_FROM ;
49- "Failed to subscribe block from"
50- ) ;
51- } ) ?;
44+ let network_resource = cardano:: api:: Network :: new ( network) ?;
45+ let subscription_id_resource = network_resource. subscribe_block ( config:: SUBSCRIBE_FROM ) ?;
5246
5347 info ! (
5448 target: "staked_ada_indexer::init" ,
@@ -66,59 +60,61 @@ pub fn on_cardano_block(
6660 subscription_id : & SubscriptionId ,
6761 block : & Block ,
6862) -> anyhow:: Result < ( ) > {
63+ trace ! (
64+ target: "staked_ada_indexer::on_cardano_block" ,
65+ slot_no = block. get_slot( ) ,
66+ is_immutable = block. is_immutable( ) ;
67+ "💫 Handling cardano block..."
68+ ) ;
69+
70+ if block. is_rollback ( ) ? {
71+ trace ! (
72+ target: "staked_ada_indexer::on_cardano_block" ,
73+ "💫 Block is the first block of a rollback. Removing volatile database records..."
74+ ) ;
75+
76+ let mut conn = sqlite:: Connection :: open ( true ) ?;
77+ let mut tx = conn. begin ( ) ?;
78+ delete_stake_registration_since_slot ( & mut tx, block. get_slot ( ) ) ?;
79+ delete_txi_since_slot ( & mut tx, block. get_slot ( ) ) ?;
80+ delete_txo_since_slot ( & mut tx, block. get_slot ( ) ) ?;
81+ delete_txo_assets_since_slot ( & mut tx, block. get_slot ( ) ) ?;
82+ tx. commit ( ) ?;
83+
84+ trace ! (
85+ target: "staked_ada_indexer::on_cardano_block" ,
86+ slot_no = block. get_slot( ) ,
87+ is_immutable = block. is_immutable( ) ;
88+ "💫 Volatile database records removed. Rollback handled"
89+ ) ;
90+ }
91+
6992 let block = block. to_catalyst_type ( subscription_id. get_network ( ) ) ?;
7093 let mut conn = sqlite:: Connection :: open ( !block. is_immutable ( ) ) ?;
7194
7295 trace ! (
7396 target: "staked_ada_indexer::on_cardano_block" ,
74- slot_no = u64 :: from( block. slot( ) ) ,
75- is_immutable = block. is_immutable( ) ;
76- "Indexing block..."
97+ "💫 Indexing block..."
7798 ) ;
7899
79100 let mut buffers = index:: Buffers :: default ( ) ;
80101 buffers. index_block ( & block) ;
81102
82103 trace ! (
83104 target: "staked_ada_indexer::on_cardano_block" ,
84- slot_no = u64 :: from( block. slot( ) ) ;
85- "Block is indexed. Inserting block data into database..."
105+ "💫 Block is indexed. Inserting block data into database..."
86106 ) ;
87107
88108 // Assume everything is broken if one of the inserts fails.
89109 let mut sql_tx = conn. begin ( ) ?;
90- insert_txo_by_stake ( & mut sql_tx, buffers. txo_by_stake ) . map_err ( |( _, error) | {
91- error ! (
92- target: "staked_ada_indexer::on_cardano_block" ,
93- error: %,
94- slot_no = u64 :: from( block. slot( ) ) ;
95- "Failed to insert txo by stake" ) ;
96- error
97- } ) ?;
98- insert_txo_assets_by_stake ( & mut sql_tx, buffers. txo_assets_by_stake ) . map_err (
99- |( _, error) | {
100- error ! (
101- target: "staked_ada_indexer::on_cardano_block" ,
102- error: %,
103- slot_no = u64 :: from( block. slot( ) ) ;
104- "Failed to insert txo assets by stake" ) ;
105- error
106- } ,
107- ) ?;
108- insert_txi_by_txn_id ( & mut sql_tx, buffers. txi_by_txn_id ) . map_err ( |( _, error) | {
109- error ! (
110- target: "staked_ada_indexer::on_cardano_block" ,
111- error: %,
112- slot_no = u64 :: from( block. slot( ) ) ;
113- "Failed to insert txi by txn id" ) ;
114- error
115- } ) ?;
110+ insert_txo_by_stake ( & mut sql_tx, buffers. txo_by_stake ) . map_err ( |( _, e) | e) ?;
111+ insert_txo_assets_by_stake ( & mut sql_tx, buffers. txo_assets_by_stake ) . map_err ( |( _, e) | e) ?;
112+ insert_txi_by_txn_id ( & mut sql_tx, buffers. txi_by_txn_id ) . map_err ( |( _, e) | e) ?;
116113 sql_tx. commit ( ) ?;
117114
118115 trace ! (
119116 target: "staked_ada_indexer::on_cardano_block" ,
120- slot_no = u64 :: from( block. slot( ) ) ;
121- "Block data is inserted. Handled event"
117+ "💫 Block data is inserted. Handled event"
122118 ) ;
123119
124120 Ok ( ( ) )
@@ -129,19 +125,53 @@ pub fn on_cardano_immutable_roll_forward(
129125 subscription_id : & SubscriptionId ,
130126 block : & Block ,
131127) -> anyhow:: Result < ( ) > {
132- let _block = block. to_catalyst_type ( subscription_id. get_network ( ) ) ;
133- let conn = sqlite:: Connection :: open ( false ) ?;
134- let _conn_volatile = sqlite:: Connection :: open ( true ) ?;
128+ trace ! (
129+ target: "staked_ada_indexer::on_cardano_immutable_roll_forward" ,
130+ slot_no = block. get_slot( ) ,
131+ is_immutable = block. is_immutable( ) ;
132+ "💫 Handling immutable roll forward..."
133+ ) ;
134+
135+ let network_resource = cardano:: api:: Network :: new ( subscription_id. get_network ( ) ) ?;
136+ let Some ( ( immutable, mutable) ) = network_resource. get_tips ( ) else {
137+ anyhow:: bail!( "Failed to get tips" ) ;
138+ } ;
139+
140+ // Only process immutable roll forward when it reaches the tip.
141+ // In case a block is not at the tip, do nothing.
142+ if mutable != block. get_slot ( ) {
143+ trace ! (
144+ target: "staked_ada_indexer::on_cardano_immutable_roll_forward" ,
145+ "💫 Block is not at the tip – skipping. Handled event."
146+ ) ;
147+ return Ok ( ( ) ) ;
148+ }
149+
150+ trace ! (
151+ target: "staked_ada_indexer::on_cardano_immutable_roll_forward" ,
152+ "💫 Updating block subscription..."
153+ ) ;
154+
155+ network_resource. subscribe_block ( cardano:: api:: SyncSlot :: Specific ( immutable) ) ?;
156+ subscription_id. unsubscribe ( ) ;
157+
158+ trace ! (
159+ target: "staked_ada_indexer::on_cardano_immutable_roll_forward" ,
160+ "💫 Subscription updated. Removing volatile database records..."
161+ ) ;
162+
163+ let mut conn = sqlite:: Connection :: open ( true ) ?;
135164
136- // Simple mock, propagating slot_no.
137- let ( slot_no, ) = conn
138- . prepare ( "SELECT ?" ) ?
139- . query_one_as :: < ( u64 , ) > ( & [ & block. get_slot ( ) . try_into ( ) ?] ) ?;
165+ let mut tx = conn. begin ( ) ?;
166+ delete_stake_registration_before_slot ( & mut tx, block. get_slot ( ) ) ?;
167+ delete_txo_before_slot ( & mut tx, block. get_slot ( ) ) ?;
168+ delete_txo_assets_before_slot ( & mut tx, block. get_slot ( ) ) ?;
169+ delete_txi_before_slot ( & mut tx, block. get_slot ( ) ) ?;
170+ tx. commit ( ) ?;
140171
141172 trace ! (
142173 target: "staked_ada_indexer::on_cardano_immutable_roll_forward" ,
143- slot_no;
144- "Handled event"
174+ "💫 Volatile data removed. Handled event"
145175 ) ;
146176 Ok ( ( ) )
147177}
0 commit comments