-
Notifications
You must be signed in to change notification settings - Fork 627
feat(tx sender): add multiple write clients for more reliable tx submission #1740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughUpdates the static version tag to v4.5.47, adds a new write_endpoints sender configuration, and refactors Sender to support a read client plus multiple write clients that broadcast signed transactions in parallel with chain ID verification and a 15s write timeout. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant Sender
participant ReadClient
participant WriteClients as "WriteClients[1..N]"
Note over Sender,ReadClient: Initialization (dial read endpoint)
Caller->>Sender: NewSender(config..., service, name, senderType, db, reg)
Sender->>ReadClient: Dial Endpoint (read)
alt write_endpoints provided
loop for each write endpoint
Sender->>WriteClients: Dial write endpoint
Sender->>ReadClient: Verify chain ID matches
end
else no write_endpoints
Note over Sender: Fallback - use ReadClient for writes
end
Note over Caller,Sender: Sending transaction
Caller->>Sender: SendTransaction(signedTx)
par Parallel broadcasts (bounded by 15s)
Sender->>WriteClients: sendRawTransaction(signedTx)
and
Sender->>WriteClients: sendRawTransaction(signedTx)
end
alt any write succeeds
Sender-->>Caller: Success (tx hash) — partial failures logged
else all writes fail
Sender-->>Caller: Aggregated error
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
rollup/internal/controller/sender/sender.go (3)
95-139
: Avoid leaking API keys in logs/errors; degrade gracefully on bad write endpoints.
- Logging full endpoint URLs at info/error can expose provider API keys (e.g., Infura/Alchemy). Log only hosts or redacted URLs.
- Don’t fail NewSender if one write endpoint is bad; skip it, warn, and proceed if at least one valid writer remains.
Apply this diff to filter/write labels and sanitize logs:
@@ - // Initialize write clients - var writeClients []*ethclient.Client + // Initialize write clients + var writeClients []*ethclient.Client + var writeEndpointLabels []string // sanitized labels aligned with writeClients if len(config.WriteEndpoints) > 0 { // Use specified write endpoints - for i, endpoint := range config.WriteEndpoints { - writeRpcClient, err := rpc.Dial(endpoint) + for i, endpoint := range config.WriteEndpoints { + writeRpcClient, err := rpc.Dial(endpoint) if err != nil { - return nil, fmt.Errorf("failed to dial write client %d (endpoint: %s), err: %w", i, endpoint, err) + log.Warn("failed to dial write client; skipping", "index", i, "endpoint", endpointLabel(endpoint), "err", err) + continue } writeClient := ethclient.NewClient(writeRpcClient) // Verify the write client is connected to the same chain writeChainID, err := writeClient.ChainID(ctx) if err != nil { - return nil, fmt.Errorf("failed to get chain ID from write client %d (endpoint: %s), err: %w", i, endpoint, err) + log.Warn("failed to get chain ID from write client; skipping", "index", i, "endpoint", endpointLabel(endpoint), "err", err) + _ = writeClient.Close() + continue } if writeChainID.Cmp(chainID) != 0 { - return nil, fmt.Errorf("write client %d (endpoint: %s) has different chain ID %s, expected %s", i, endpoint, writeChainID.String(), chainID.String()) + log.Warn("write client has different chain ID; skipping", "index", i, "endpoint", endpointLabel(endpoint), "have", writeChainID.String(), "want", chainID.String()) + _ = writeClient.Close() + continue } - writeClients = append(writeClients, writeClient) + writeClients = append(writeClients, writeClient) + writeEndpointLabels = append(writeEndpointLabels, endpointLabel(endpoint)) } - log.Info("initialized sender with multiple write clients", "service", service, "name", name, "readEndpoint", config.Endpoint, "writeEndpoints", config.WriteEndpoints) + if len(writeClients) == 0 { + return nil, fmt.Errorf("no valid write endpoints after filtering") + } + log.Info("initialized sender with multiple write clients", "service", service, "name", name, "readEndpoint", endpointLabel(config.Endpoint), "writeEndpoints", writeEndpointLabels) } else { // Use read client for writing (backward compatibility) writeClients = append(writeClients, client) - log.Info("initialized sender with single client", "service", service, "name", name, "endpoint", config.Endpoint) + writeEndpointLabels = append(writeEndpointLabels, endpointLabel(config.Endpoint)) + log.Info("initialized sender with single client", "service", service, "name", name, "endpoint", endpointLabel(config.Endpoint)) } @@ - writeClients: writeClients, + writeClients: writeClients, + // store labels for logging to avoid indexing config on send path + // (add a field to Sender to hold these if preferred)Add this helper and import: ```diff @@ -import ( +import ( + "net/url" @@ )
// Helper: return host as a safe label for an endpoint URL. func endpointLabel(raw string) string { if u, err := url.Parse(raw); err == nil && u.Host != "" { return u.Host } return raw }If you prefer storing labels on the struct, add:
type Sender struct { @@ - writeClients []*ethclient.Client // The clients to send transactions to (write operations) + writeClients []*ethclient.Client // The clients to send transactions to (write operations) + writeEndpointLabels []string // Sanitized labels aligned with writeClientsand set
writeEndpointLabels: writeEndpointLabels,
in the constructor.Also applies to: 146-147
342-359
: Don’t delete the pending record on benign send errors (e.g., “already known”).If all writers return “already known”, the tx is in mempools but we’d delete its DB record and lose tracking. Keep the record and proceed.
Apply this diff:
- if err := s.sendTransactionToMultipleClients(signedTx); err != nil { + if err := s.sendTransactionToMultipleClients(signedTx); err != nil { + if strings.Contains(err.Error(), "already known") || strings.Contains(err.Error(), "known transaction") { + log.Info("tx already known by node(s); keeping pending record", "tx hash", signedTx.Hash().String()) + } else { // Delete the transaction from the pending transaction table if it fails to send. if updateErr := s.pendingTransactionOrm.DeleteTransactionByTxHash(s.ctx, signedTx.Hash()); updateErr != nil { log.Error("failed to delete transaction", "tx hash", signedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", signedTx.Nonce(), "err", updateErr) return common.Hash{}, 0, fmt.Errorf("failed to delete transaction, err: %w", updateErr) } log.Error("failed to send tx", "tx hash", signedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", signedTx.Nonce(), "err", err) // Check if contain nonce, and reset nonce // only reset nonce when it is not from resubmit if strings.Contains(err.Error(), "nonce too low") { if err := s.resetNonce(); err != nil { log.Warn("failed to reset nonce after failed send transaction", "address", s.transactionSigner.GetAddr().String(), "err", err) return common.Hash{}, 0, fmt.Errorf("failed to reset nonce after failed send transaction, err: %w", err) } } return common.Hash{}, 0, fmt.Errorf("failed to send transaction, err: %w", err) + } }
757-787
: Resubmission: classify “already known” as success; avoid DB rollback in that case.If the replacement is already present, we shouldn’t roll back statuses.
Apply this diff:
- if err := s.sendTransactionToMultipleClients(newSignedTx); err != nil { + if err := s.sendTransactionToMultipleClients(newSignedTx); err != nil { + if strings.Contains(err.Error(), "already known") || strings.Contains(err.Error(), "known transaction") { + log.Info("replacing tx already known; skipping rollback", "tx hash", newSignedTx.Hash().String(), "nonce", newSignedTx.Nonce()) + return + } if strings.Contains(err.Error(), "nonce too low") { @@ // SendTransaction failed, need to rollback the previous database changes
🧹 Nitpick comments (6)
rollup/conf/config.json (1)
43-47
: Config supports multi-write; consider ops hardening.
- Expose the broadcast timeout as a config knob (e.g., write_broadcast_timeout_sec) to avoid baking 15s into code.
- Public RPCs can rate-limit blob txs; allow env overrides and document intended use per environment.
rollup/internal/config/relayer.go (1)
10-17
: Comment + schema match behavior; future-proof with timeout knob.Looks consistent with Sender logic (Endpoint for reads, WriteEndpoints for writes when present). Consider adding an optional write_broadcast_timeout_sec to this struct for configurability.
rollup/internal/controller/sender/sender.go (4)
71-74
: Ensure RPC clients are closed to avoid FD/connection leaks.We now maintain multiple ethclient.Clients; Stop() should close them (including the read client) to prevent leaks in long‑running processes or hot restarts.
Apply this diff to close clients on Stop:
func (s *Sender) Stop() { - close(s.stopCh) - log.Info("sender stopped", "name", s.name, "service", s.service, "address", s.transactionSigner.GetAddr().String()) + close(s.stopCh) + // Close write clients (avoid double-closing the shared read client) + for _, wc := range s.writeClients { + if wc != nil && wc != s.client { + _ = wc.Close() + } + } + if s.client != nil { + _ = s.client.Close() + } + log.Info("sender stopped", "name", s.name, "service", s.service, "address", s.transactionSigner.GetAddr().String()) }
205-279
: Treat “already known” as success; decouple endpoint label from config indexing; make timeout configurable.
- Classify “already known”/“known transaction” as success to avoid false failures.
- Use stored labels instead of indexing config.WriteEndpoints[i] to prevent future mismatches.
- Consider making 15s timeout configurable (config).
Apply this diff:
func (s *Sender) sendTransactionToMultipleClients(signedTx *gethTypes.Transaction) error { - ctx, cancel := context.WithTimeout(s.ctx, 15*time.Second) + ctx, cancel := context.WithTimeout(s.ctx, 15*time.Second) // TODO: make configurable defer cancel() @@ - for i, client := range s.writeClients { + for i, client := range s.writeClients { wg.Add(1) - // Determine endpoint URL for this client - endpoint := s.config.WriteEndpoints[i] + // Determine endpoint label for this client + endpoint := "unknown" + if i < len(s.writeEndpointLabels) { + endpoint = s.writeEndpointLabels[i] + } go func(ep string, writeClient *ethclient.Client) { defer wg.Done() - err := writeClient.SendTransaction(ctx, signedTx) + err := writeClient.SendTransaction(ctx, signedTx) + if err != nil { + // Normalize benign errors as success + msg := err.Error() + if strings.Contains(msg, "already known") || strings.Contains(msg, "known transaction") { + err = nil + } + } resultChan <- result{endpoint: ep, err: err} }(endpoint, client) }
834-834
: Guard pending-number underflow.header.Number could be 0 on fresh chains; subtracting 1 underflows uint64. Clamp at 0.
Apply this diff:
- return header.Number.Uint64() - 1, baseFee, blobBaseFee, nil + n := header.Number.Uint64() + if n > 0 { + n = n - 1 + } + return n, baseFee, blobBaseFee, nil
205-279
: Optional: add per-endpoint metrics.Track per-endpoint success/failure counters and latency to spot flaky providers and tune the set.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
common/version/version.go
(1 hunks)rollup/conf/config.json
(1 hunks)rollup/internal/config/relayer.go
(1 hunks)rollup/internal/controller/sender/sender.go
(7 hunks)
🔇 Additional comments (2)
common/version/version.go (1)
8-8
: Version bump looks good.No logic changes; just ensure release notes and deployment artifacts pick up v4.5.47.
rollup/internal/controller/sender/sender.go (1)
10-10
: Import of sync is appropriate for parallel writes.
…o feat/multiple-write-clients
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
rollup/internal/controller/relayer/relayer_test.go (1)
59-59
: Confirmed: NewSender does not dedupe write endpoints — duplicates will create multiple clients and parallel writes to the same RPC.NewSender iterates config.WriteEndpoints and appends a client per entry; SendTransaction sends to all s.writeClients in parallel. The test's duplicate endpoints therefore exercise the multi-write path but submit the same tx concurrently to the same RPC (can produce "already known"/rate-limit flakes).
- Action: document the intent in the test or use distinct endpoints; optionally dedupe endpoints in NewSender if you want to avoid same-RPC double-submits.
- cfg.L2Config.RelayerConfig.SenderConfig.WriteEndpoints = []string{cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, cfg.L2Config.RelayerConfig.SenderConfig.Endpoint} + // Intentionally duplicate the endpoint to exercise the multi-write path in tests. + // Note: this submits the same tx twice to the same RPC and may produce "already known" + // responses; if flakes arise, either use a single element or spin up a second RPC. + cfg.L2Config.RelayerConfig.SenderConfig.WriteEndpoints = []string{ + cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, + cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rollup/internal/controller/relayer/relayer_test.go
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
rollup/internal/controller/relayer/relayer_test.go (2)
rollup/internal/config/l2.go (1)
L2Config
(10-29)rollup/internal/config/relayer.go (2)
RelayerConfig
(61-86)SenderConfig
(9-38)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: check
- GitHub Check: tests
- GitHub Check: tests
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #1740 +/- ##
===========================================
- Coverage 36.98% 36.90% -0.08%
===========================================
Files 245 245
Lines 20804 20889 +85
===========================================
+ Hits 7695 7710 +15
- Misses 12299 12368 +69
- Partials 810 811 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Purpose or design rationale of this PR
This PR introduces the option to add multiple clients to the transaction sender. This allows for more redundant tx submission and we can submit to multiple RPCs/builders at once. This should help us to decrease the time for blob transactions to be included.
PR title
Your PR title must follow conventional commits (as we are doing squash merge for each PR), so it must start with one of the following types:
Deployment tag versioning
Has
tag
incommon/version.go
been updated or have you addedbump-version
label to this PR?Breaking change label
Does this PR have the
breaking-change
label?Summary by CodeRabbit
New Features
Tests
Chores