Skip to content

Commit a984ff4

Browse files
committed
refactor: integrate compression into forester tree flo
1 parent f9ac11b commit a984ff4

File tree

15 files changed

+212
-300
lines changed

15 files changed

+212
-300
lines changed

.coderabbit.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ reviews:
4949
enabled: true
5050

5151
tools:
52+
github-checks:
53+
timeout_ms: 900000
5254
eslint:
5355
enabled: true
5456
ruff:

.github/workflows/forester-tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ env:
3838
TEST_V2_STATE: "true"
3939
TEST_V1_ADDRESS: "true"
4040
TEST_V2_ADDRESS: "true"
41+
FORESTER_ENABLE_COMPRESSIBLE: "true"
4142

4243
jobs:
4344
test:

forester/src/cli.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,11 @@ pub struct StartArgs {
223223

224224
#[arg(
225225
long,
226-
env = "FORESTER_COMPRESSIBLE_WS_URL",
227-
help = "WebSocket URL for compressible account subscriptions (enables compressible mode; defaults to ws_rpc_url if not specified)"
226+
env = "FORESTER_ENABLE_COMPRESSIBLE",
227+
help = "Enable compressible account tracking and compression using ws_rpc_url",
228+
default_value = "false"
228229
)]
229-
pub compressible_ws_url: Option<String>, // TODO: check whether we already have a ws url and can make this a bool
230+
pub enable_compressible: bool,
230231
}
231232

232233
#[derive(Parser, Clone, Debug)]

forester/src/compressible/compressor.rs

Lines changed: 7 additions & 223 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
1-
use std::{str::FromStr, sync::Arc, time::Duration};
1+
use std::{str::FromStr, sync::Arc};
22

33
use anchor_lang::{InstructionData, ToAccountMetas};
4-
use forester_utils::{
5-
forester_epoch::{Epoch, EpochPhases},
6-
rpc_pool::SolanaRpcPool,
7-
};
4+
use forester_utils::rpc_pool::SolanaRpcPool;
85
use light_client::rpc::Rpc;
96
use light_compressed_token_sdk::instructions::compress_and_close::CompressAndCloseAccounts as CTokenAccounts;
107
use light_compressible::config::CompressibleConfig;
118
use light_ctoken_types::COMPRESSED_TOKEN_PROGRAM_ID;
129
use light_registry::{
1310
accounts::CompressAndCloseContext, compressible::compressed_token::CompressAndCloseIndices,
14-
instruction::CompressAndClose, protocol_config::state::EpochState,
15-
utils::get_forester_epoch_pda_from_authority, ForesterEpochPda,
11+
instruction::CompressAndClose,
1612
};
1713
use light_sdk::instruction::PackedAccounts;
1814
use solana_sdk::{
@@ -21,244 +17,37 @@ use solana_sdk::{
2117
signature::{Keypair, Signature},
2218
signer::Signer,
2319
};
24-
use tracing::{debug, error, info, warn};
20+
use tracing::{debug, warn};
2521

2622
use super::{state::CompressibleAccountTracker, types::CompressibleAccountState};
27-
use crate::{slot_tracker::SlotTracker, Result};
23+
use crate::Result;
2824

2925
const REGISTRY_PROGRAM_ID_STR: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX";
3026

31-
/// Input parameters for spawning a compression task
32-
pub struct SpawnCompressionTaskInput<R: Rpc> {
33-
pub tracker: Arc<CompressibleAccountTracker>,
34-
pub config: super::config::CompressibleConfig,
35-
pub epoch_info: Epoch,
36-
pub rpc_pool: Arc<SolanaRpcPool<R>>,
37-
pub payer_keypair: Keypair,
38-
pub slot_tracker: Arc<SlotTracker>,
39-
pub sleep_after_processing_ms: u64,
40-
pub sleep_when_idle_ms: u64,
41-
}
42-
4327
/// Compression executor that builds and sends compress_and_close transactions via registry program
4428
pub struct Compressor<R: Rpc> {
4529
rpc_pool: Arc<SolanaRpcPool<R>>,
4630
tracker: Arc<CompressibleAccountTracker>,
4731
payer_keypair: Keypair,
48-
slot_tracker: Arc<SlotTracker>,
49-
batch_size: usize,
5032
}
5133

5234
impl<R: Rpc> Compressor<R> {
5335
pub fn new(
5436
rpc_pool: Arc<SolanaRpcPool<R>>,
5537
tracker: Arc<CompressibleAccountTracker>,
5638
payer_keypair: Keypair,
57-
slot_tracker: Arc<SlotTracker>,
58-
batch_size: usize,
5939
) -> Self {
6040
Self {
6141
rpc_pool,
6242
tracker,
6343
payer_keypair,
64-
slot_tracker,
65-
batch_size,
66-
}
67-
}
68-
69-
/// Spawns a compression task for the given epoch
70-
///
71-
/// Returns a JoinHandle that completes when the compression task finishes
72-
pub fn spawn_task(
73-
input: SpawnCompressionTaskInput<R>,
74-
) -> tokio::task::JoinHandle<crate::Result<()>> {
75-
let batch_size = input.config.batch_size;
76-
77-
info!(
78-
"Spawning compression task for epoch {}",
79-
input.epoch_info.epoch
80-
);
81-
82-
tokio::spawn(async move {
83-
let mut compressor = Self::new(
84-
input.rpc_pool,
85-
input.tracker,
86-
input.payer_keypair,
87-
input.slot_tracker.clone(),
88-
batch_size,
89-
);
90-
91-
match compressor
92-
.run_for_epoch(
93-
&input.epoch,
94-
input.sleep_after_processing_ms,
95-
input.sleep_when_idle_ms,
96-
)
97-
.await
98-
{
99-
Ok(()) => {
100-
info!("Compression task completed for epoch {}", input.epoch.epoch);
101-
Ok(())
102-
}
103-
Err(e) => {
104-
error!("Compression task error: {:?}", e);
105-
Err(e)
106-
}
107-
}
108-
})
109-
}
110-
111-
/// Run compression for a specific epoch during the active phase
112-
pub async fn run_for_epoch(
113-
&mut self,
114-
epoch: &Epoch,
115-
sleep_after_processing_ms: u64,
116-
sleep_when_idle_ms: u64,
117-
) -> Result<()> {
118-
let current_epoch = epoch.epoch;
119-
let active_phase_end_slot = epoch.phases.active.end;
120-
let epoch_phases = &epoch.phases;
121-
122-
info!(
123-
"Starting compression for epoch {} (active phase ends at slot {})",
124-
current_epoch, active_phase_end_slot
125-
);
126-
127-
while self.slot_tracker.estimated_current_slot() < active_phase_end_slot {
128-
let current_slot = self.slot_tracker.estimated_current_slot();
129-
130-
// Check if still in active phase
131-
let current_phase = epoch_phases.get_current_epoch_state(current_slot);
132-
if current_phase != EpochState::Active {
133-
info!(
134-
"No longer in active phase (current phase: {:?}), exiting compression",
135-
current_phase
136-
);
137-
break;
138-
}
139-
140-
// Check forester eligibility
141-
if !self
142-
.check_compression_eligibility(current_epoch, current_slot, epoch_phases)
143-
.await?
144-
{
145-
warn!(
146-
"Forester no longer eligible for compression in epoch {}",
147-
current_epoch
148-
);
149-
break;
150-
}
151-
152-
// Get accounts that are ready to be compressed
153-
let accounts = self.tracker.get_ready_to_compress(current_slot);
154-
155-
if accounts.is_empty() {
156-
debug!("No compressible accounts found");
157-
tokio::time::sleep(Duration::from_millis(sleep_when_idle_ms)).await;
158-
continue;
159-
}
160-
161-
info!("Found {} compressible accounts", accounts.len());
162-
163-
let mut total_compressed = 0;
164-
165-
// Process in batches
166-
for (batch_num, batch) in accounts.chunks(self.batch_size).enumerate() {
167-
info!(
168-
"Processing batch {} with {} accounts",
169-
batch_num + 1,
170-
batch.len()
171-
);
172-
173-
match self.compress_batch(batch, current_epoch).await {
174-
Ok(sig) => {
175-
info!(
176-
"Successfully compressed {} accounts in batch {}: {}",
177-
batch.len(),
178-
batch_num + 1,
179-
sig
180-
);
181-
total_compressed += batch.len();
182-
}
183-
Err(e) => {
184-
error!("Failed to compress batch {}: {:?}", batch_num + 1, e);
185-
// Keep accounts in tracker for retry
186-
}
187-
}
188-
}
189-
190-
// Sleep based on whether we did work
191-
let sleep_duration_ms = if total_compressed > 0 {
192-
sleep_after_processing_ms
193-
} else {
194-
sleep_when_idle_ms
195-
};
196-
tokio::time::sleep(Duration::from_millis(sleep_duration_ms)).await;
197-
}
198-
199-
info!("Compression for epoch {} completed", current_epoch);
200-
Ok(())
201-
}
202-
203-
/// Check if forester is eligible for compression in the current epoch
204-
async fn check_compression_eligibility(
205-
&self,
206-
current_epoch: u64,
207-
current_slot: u64,
208-
epoch_phases: &EpochPhases,
209-
) -> Result<bool> {
210-
// Check if in active phase
211-
let current_phase = epoch_phases.get_current_epoch_state(current_slot);
212-
if current_phase != EpochState::Active {
213-
return Ok(false);
21444
}
215-
216-
// Check if forester is registered for this epoch
217-
let (forester_epoch_pda_pubkey, _) =
218-
get_forester_epoch_pda_from_authority(&self.payer_keypair.pubkey(), current_epoch);
219-
220-
let rpc = self.rpc_pool.get_connection().await?;
221-
let forester_epoch_pda = rpc
222-
.get_anchor_account::<ForesterEpochPda>(&forester_epoch_pda_pubkey)
223-
.await?;
224-
225-
let pda = match forester_epoch_pda {
226-
Some(pda) => pda,
227-
None => return Ok(false),
228-
};
229-
230-
// Get total epoch weight
231-
let total_epoch_weight = match pda.total_epoch_weight {
232-
Some(weight) => weight,
233-
None => {
234-
debug!(
235-
"Total epoch weight not yet available for epoch {}",
236-
current_epoch
237-
);
238-
return Ok(false);
239-
}
240-
};
241-
242-
// Calculate current light slot
243-
let current_light_slot =
244-
(current_slot - epoch_phases.active.start) / pda.protocol_config.slot_length;
245-
246-
// Check eligibility using Pubkey::default() (epoch-level, not tree-specific)
247-
let eligible_forester_slot_index = ForesterEpochPda::get_eligible_forester_index(
248-
current_light_slot,
249-
&Pubkey::default(),
250-
total_epoch_weight,
251-
current_epoch,
252-
)
253-
.map_err(|e| anyhow::anyhow!("Failed to calculate eligible forester index: {:?}", e))?;
254-
255-
Ok(pda.is_eligible(eligible_forester_slot_index))
25645
}
25746

25847
pub async fn compress_batch(
25948
&self,
26049
account_states: &[CompressibleAccountState],
261-
current_epoch: u64,
50+
registered_forester_pda: Pubkey,
26251
) -> Result<Signature> {
26352
let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID_STR)?;
26453
let compressed_token_program_id = Pubkey::new_from_array(COMPRESSED_TOKEN_PROGRAM_ID);
@@ -274,11 +63,6 @@ impl<R: Rpc> Compressor<R> {
27463
);
27564

27665
debug!("Compression authority: {}", compression_authority);
277-
278-
// Get registered forester PDA
279-
let (registered_forester_pda, _) =
280-
get_forester_epoch_pda_from_authority(&self.payer_keypair.pubkey(), current_epoch);
281-
28266
debug!("Registered forester PDA: {}", registered_forester_pda);
28367

28468
// Get compressible config PDA
@@ -422,7 +206,7 @@ impl<R: Rpc> Compressor<R> {
422206

423207
// Sync accounts to verify they're closed
424208
if let Err(e) = self.tracker.sync_accounts(&*rpc, &pubkeys).await {
425-
error!("Failed to sync accounts after compression: {:?}", e);
209+
warn!("Failed to sync accounts after compression: {:?}. Tracker will update via subscriptions.", e);
426210
}
427211

428212
Ok(signature)

forester/src/config.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -306,11 +306,13 @@ impl ForesterConfig {
306306
derivation_pubkey: derivation,
307307
address_tree_data: vec![],
308308
state_tree_data: vec![],
309-
compressible_config: args
310-
.compressible_ws_url
311-
.clone()
312-
.or_else(|| args.ws_rpc_url.clone())
313-
.map(crate::compressible::config::CompressibleConfig::new),
309+
compressible_config: if args.enable_compressible {
310+
args.ws_rpc_url
311+
.clone()
312+
.map(crate::compressible::config::CompressibleConfig::new)
313+
} else {
314+
None
315+
},
314316
})
315317
}
316318

0 commit comments

Comments
 (0)