Skip to content

Commit 32f75df

Browse files
committed
refactor: parallelize compression transactions
1 parent 1fc9a09 commit 32f75df

File tree

4 files changed

+64
-17
lines changed

4 files changed

+64
-17
lines changed

forester/src/compressible/compressor.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ pub struct Compressor<R: Rpc> {
3131
payer_keypair: Keypair,
3232
}
3333

34+
impl<R: Rpc> Clone for Compressor<R> {
35+
fn clone(&self) -> Self {
36+
Self {
37+
rpc_pool: Arc::clone(&self.rpc_pool),
38+
tracker: Arc::clone(&self.tracker),
39+
payer_keypair: self.payer_keypair.insecure_clone(),
40+
}
41+
}
42+
}
43+
3444
impl<R: Rpc> Compressor<R> {
3545
pub fn new(
3646
rpc_pool: Arc<SolanaRpcPool<R>>,

forester/src/compressible/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,25 @@ pub struct CompressibleConfig {
77
/// Batch size for compression operations
88
#[serde(default = "default_batch_size")]
99
pub batch_size: usize,
10+
/// Maximum number of concurrent compression batches
11+
#[serde(default = "default_max_concurrent_batches")]
12+
pub max_concurrent_batches: usize,
1013
}
1114

1215
fn default_batch_size() -> usize {
1316
10
1417
}
1518

19+
fn default_max_concurrent_batches() -> usize {
20+
10
21+
}
22+
1623
impl CompressibleConfig {
1724
pub fn new(ws_url: String) -> Self {
1825
Self {
1926
ws_url,
2027
batch_size: default_batch_size(),
28+
max_concurrent_batches: default_max_concurrent_batches(),
2129
}
2230
}
2331
}

forester/src/epoch_manager.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,34 +1684,62 @@ impl<R: Rpc> EpochManager<R> {
16841684
current_epoch,
16851685
);
16861686

1687-
let mut total_compressed = 0;
1688-
for (batch_idx, batch) in accounts.chunks(config.batch_size).enumerate() {
1689-
debug!(
1690-
"Processing compression batch {}/{} with {} accounts",
1691-
batch_idx + 1,
1692-
num_batches,
1693-
batch.len()
1694-
);
1687+
// Create parallel compression futures
1688+
use futures::stream::StreamExt;
1689+
1690+
// Collect chunks into owned vectors to avoid lifetime issues
1691+
let batches: Vec<(usize, Vec<_>)> = accounts
1692+
.chunks(config.batch_size)
1693+
.enumerate()
1694+
.map(|(idx, chunk)| (idx, chunk.to_vec()))
1695+
.collect();
1696+
1697+
let compression_futures = batches.into_iter().map(|(batch_idx, batch)| {
1698+
let compressor = compressor.clone();
1699+
async move {
1700+
debug!(
1701+
"Processing compression batch {}/{} with {} accounts",
1702+
batch_idx + 1,
1703+
num_batches,
1704+
batch.len()
1705+
);
16951706

1696-
match compressor
1697-
.compress_batch(batch, registered_forester_pda)
1698-
.await
1699-
{
1700-
Ok(sig) => {
1707+
match compressor
1708+
.compress_batch(&batch, registered_forester_pda)
1709+
.await
1710+
{
1711+
Ok(sig) => Ok((batch_idx, batch.len(), sig)),
1712+
Err(e) => Err((batch_idx, batch.len(), e)),
1713+
}
1714+
}
1715+
});
1716+
1717+
// Execute batches in parallel with concurrency limit
1718+
let results = futures::stream::iter(compression_futures)
1719+
.buffer_unordered(config.max_concurrent_batches)
1720+
.collect::<Vec<_>>()
1721+
.await;
1722+
1723+
// Aggregate results
1724+
let mut total_compressed = 0;
1725+
for result in results {
1726+
match result {
1727+
Ok((batch_idx, count, sig)) => {
17011728
info!(
17021729
"Successfully compressed {} accounts in batch {}/{}: {}",
1703-
batch.len(),
1730+
count,
17041731
batch_idx + 1,
17051732
num_batches,
17061733
sig
17071734
);
1708-
total_compressed += batch.len();
1735+
total_compressed += count;
17091736
}
1710-
Err(e) => {
1737+
Err((batch_idx, count, e)) => {
17111738
error!(
1712-
"Compression batch {}/{} failed: {:?}",
1739+
"Compression batch {}/{} ({} accounts) failed: {:?}",
17131740
batch_idx + 1,
17141741
num_batches,
1742+
count,
17151743
e
17161744
);
17171745
}

forester/tests/e2e_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ async fn e2e_test() {
254254
compressible_config: Some(forester::compressible::CompressibleConfig {
255255
ws_url: get_ws_rpc_url(),
256256
batch_size: 10,
257+
max_concurrent_batches: 10,
257258
}),
258259
};
259260
let test_mode = TestMode::from_env();

0 commit comments

Comments
 (0)