Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions executors/src/eoa/store/submitted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,21 +339,18 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
continue;
}

// Process each unique transaction_id once
if processed_ids.insert(tx.transaction_id()) {
// Do business logic only once per unique transaction_id
let is_first_occurrence = processed_ids.insert(tx.transaction_id());

if is_first_occurrence {
match (tx.transaction_id(), confirmed_ids.get(tx.transaction_id())) {
// if the transaction id is noop, we don't do anything but still clean up
(NO_OP_TRANSACTION_ID, _) => {
// Clean up noop transaction from Redis
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
report.noop_count += 1;
}

// in case of a valid ID, we check if it's in the confirmed transactions
// if it is confirmed, we succeed it and queue success jobs
// SCENARIO 1: Transaction ID was confirmed - do confirmed business logic
(id, Some(confirmed_tx)) => {
// Clean up confirmed transaction from Redis
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
let data_key_name = self.keys.transaction_data_key_name(id);
pipeline.hset(&data_key_name, "status", "confirmed");
pipeline.hset(&data_key_name, "completed_at", now);
Expand Down Expand Up @@ -392,26 +389,20 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
&mut tx_context,
self.webhook_queue.clone(),
) {
tracing::error!("Failed to queue webhook for fail: {}", e);
tracing::error!("Failed to queue webhook for confirmed transaction: {}", e);
}
}
}

report.moved_to_success += 1;
}

// if the ID is not in the confirmed transactions, we queue it for pending
// SCENARIO 2: Transaction ID was NOT confirmed - check if we should replace it
_ => {
if let SubmittedTransactionHydrated::Real(tx) = tx {
// Only move to pending (replaced) if it's within the latest transaction count range
// This prevents false replacements due to flashblocks propagation delays
if tx_nonce <= self.transaction_counts.latest {
// Clean up replaced transaction from Redis
self.remove_transaction_from_redis_submitted_zset(
pipeline,
&SubmittedTransactionHydrated::Real(tx.clone()),
);

// zadd_multiple expects (score, member)
replaced_transactions
.push((tx.queued_at, tx.transaction_id.clone()));
Expand All @@ -434,7 +425,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
&mut tx_context,
self.webhook_queue.clone(),
) {
tracing::error!("Failed to queue webhook for fail: {}", e);
tracing::error!("Failed to queue webhook for replaced transaction: {}", e);
}
}

Expand All @@ -451,11 +442,33 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
"Keeping transaction in submitted state due to potential flashblocks propagation delay"
);
// Don't increment any counter - transaction stays in submitted state
// Note: We still need to check if we should clean up this hash below
}
}
}
}
}

// Redis cleanup logic: Clean up hashes based on transaction-level decisions
match (tx.transaction_id(), confirmed_ids.get(tx.transaction_id())) {
// SCENARIO 1: Transaction ID was confirmed - remove ALL hashes (even if beyond latest due to flashblocks)
(_, Some(_)) => {
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
}
// NOOP transactions: always clean up
(NO_OP_TRANSACTION_ID, _) => {
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
}
// SCENARIO 2: Transaction ID was NOT confirmed - only clean up if within latest confirmed nonce range
_ => {
if let SubmittedTransactionHydrated::Real(_) = tx {
if tx_nonce <= self.transaction_counts.latest {
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
}
// If beyond latest confirmed nonce, keep in submitted state (don't clean up)
}
}
}
}

if !replaced_transactions.is_empty() {
Expand Down