From 3630400486422f876b1dacad0d42d1ddf870b383 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Wed, 17 Dec 2025 18:07:03 -0300 Subject: [PATCH 01/16] Move the comments on the aggregate_and_submit_proofs_on_chain send loop --- aggregation_mode/proof_aggregator/src/backend/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index 9dee2164c..c0d8fbc6b 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -175,13 +175,13 @@ impl ProofAggregator { hex::encode(blob_versioned_hash) ); - // Iterate until we can send the proof on-chain + // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed + // should be considered over a 24h period. let mut time_elapsed = Duration::from_secs(24 * 3600); + // Iterate until we can send the proof on-chain loop { - // We add 24 hours because the proof aggregator runs once a day, so the time elapsed - // should be considered over a 24h period. - + // Fetch gas price from network let gas_price = self .rpc_provider .get_gas_price() From 36a8369144023bd6e190f14f3b63ebb23423a277 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 10:57:29 -0300 Subject: [PATCH 02/16] Initial version with retry only on proof sending --- .../proof_aggregator/src/aggregators/mod.rs | 1 + .../src/aggregators/risc0_aggregator.rs | 1 + .../src/aggregators/sp1_aggregator.rs | 1 + .../proof_aggregator/src/backend/mod.rs | 239 +++++++++++++----- 4 files changed, 184 insertions(+), 58 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/aggregators/mod.rs b/aggregation_mode/proof_aggregator/src/aggregators/mod.rs index 07e4211b8..18fe1b5f1 100644 --- a/aggregation_mode/proof_aggregator/src/aggregators/mod.rs +++ b/aggregation_mode/proof_aggregator/src/aggregators/mod.rs @@ -161,6 +161,7 @@ impl ZKVMEngine { } } +#[derive(Clone)] pub enum AlignedProof { SP1(Box), Risc0(Box), diff --git a/aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs b/aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs index 45fcb6526..bceca4193 100644 --- a/aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs +++ b/aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs @@ -4,6 +4,7 @@ use aligned_sdk::aggregation_layer::AggregationModeProvingSystem; use risc0_zkvm::{default_prover, ExecutorEnv, ProverOpts, Receipt}; use sha3::{Digest, Keccak256}; +#[derive(Clone)] pub struct Risc0ProofReceiptAndImageId { pub image_id: [u8; 32], pub receipt: Receipt, diff --git a/aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs b/aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs index d4e8df8b4..2ead623b0 100644 --- a/aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs +++ b/aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs @@ -25,6 +25,7 @@ static SP1_PROVER_CLIENT: LazyLock = LazyLock::new(ProverClient::from static SP1_PROVER_CLIENT_CPU: LazyLock = LazyLock::new(|| ProverClient::builder().cpu().build()); +#[derive(Clone)] pub struct SP1ProofWithPubValuesAndVk { pub proof_with_pub_values: SP1ProofWithPublicValues, pub vk: SP1VerifyingKey, diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index c0d8fbc6b..e890962e7 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -205,8 +205,10 @@ impl ProofAggregator { } info!("Sending proof to ProofAggregationService contract..."); + + // Retry in case of failure let receipt = self - .send_proof_to_verify_on_chain(blob, blob_versioned_hash, aggregated_proof) + .send_proof_to_verify_on_chain_retryable(blob, blob_versioned_hash, aggregated_proof) .await?; info!( "Proof sent and verified, tx hash {:?}", @@ -272,70 +274,49 @@ impl ProofAggregator { expected_cost_in_wei <= max_to_spend_in_wei } - async fn send_proof_to_verify_on_chain( + async fn send_proof_to_verify_on_chain_retryable( &self, blob: BlobTransactionSidecar, blob_versioned_hash: [u8; 32], aggregated_proof: AlignedProof, ) -> Result { - let tx_req = match aggregated_proof { - AlignedProof::SP1(proof) => self - .proof_aggregation_service - .verifyAggregationSP1( - blob_versioned_hash.into(), - proof.proof_with_pub_values.public_values.to_vec().into(), - proof.proof_with_pub_values.bytes().into(), - self.sp1_chunk_aggregator_vk_hash_bytes.into(), + match send_proof_to_verify_on_chain( + blob.clone(), + blob_versioned_hash, + aggregated_proof.clone(), + self.proof_aggregation_service.clone(), + self.sp1_chunk_aggregator_vk_hash_bytes, + self.risc0_chunk_aggregator_image_id_bytes, + ) + .await + { + Ok(tx_receipt) => Ok(tx_receipt), + Err(err) => { + tracing::error!("Failed to send proof to be verified on chain: {err:?}"); + + retry_function( + || { + send_proof_to_verify_on_chain( + blob.clone(), + blob_versioned_hash, + aggregated_proof.clone(), + self.proof_aggregation_service.clone(), + self.sp1_chunk_aggregator_vk_hash_bytes, + self.risc0_chunk_aggregator_image_id_bytes, + ) + }, + ETHEREUM_CALL_MIN_RETRY_DELAY, + ETHEREUM_CALL_BACKOFF_FACTOR, + ETHEREUM_CALL_MAX_RETRIES, + ETHEREUM_CALL_MAX_RETRY_DELAY, ) - .sidecar(blob) - .into_transaction_request(), - AlignedProof::Risc0(proof) => { - let encoded_seal = encode_seal(&proof.receipt).map_err(|e| { - AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string()) - })?; - self.proof_aggregation_service - .verifyAggregationRisc0( - blob_versioned_hash.into(), - encoded_seal.into(), - proof.receipt.journal.bytes.into(), - self.risc0_chunk_aggregator_image_id_bytes.into(), - ) - .sidecar(blob) - .into_transaction_request() + .await + .map_err(|e| { + error!("Could't get nonce: {:?}", e); + e.inner() + }) } - }; - - let provider = self.proof_aggregation_service.provider(); - let envelope = provider - .fill(tx_req) - .await - .map_err(Self::send_verify_aggregated_proof_err)? - .try_into_envelope() - .map_err(Self::send_verify_aggregated_proof_err)?; - let tx: EthereumTxEnvelope> = envelope - .try_into_pooled() - .map_err(Self::send_verify_aggregated_proof_err)? - .try_map_eip4844(|tx| { - tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get())) - }) - .map_err(Self::send_verify_aggregated_proof_err)?; - - let encoded_tx = tx.encoded_2718(); - let pending_tx = provider - .send_raw_transaction(&encoded_tx) - .await - .map_err(Self::send_verify_aggregated_proof_err)?; - - let receipt = pending_tx - .get_receipt() - .await - .map_err(Self::send_verify_aggregated_proof_err)?; - - Ok(receipt) - } - - fn send_verify_aggregated_proof_err(err: E) -> AggregatedProofSubmissionError { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + } } /// ### Blob capacity @@ -411,6 +392,148 @@ impl ProofAggregator { } } +async fn send_proof_to_verify_on_chain( + blob: BlobTransactionSidecar, + blob_versioned_hash: [u8; 32], + aggregated_proof: AlignedProof, + proof_aggregation_service: AlignedProofAggregationServiceContract, + sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], + risc0_chunk_aggregator_image_id_bytes: [u8; 32], +) -> Result> { + let tx_req = match aggregated_proof { + AlignedProof::SP1(proof) => proof_aggregation_service + .verifyAggregationSP1( + blob_versioned_hash.into(), + proof.proof_with_pub_values.public_values.to_vec().into(), + proof.proof_with_pub_values.bytes().into(), + sp1_chunk_aggregator_vk_hash_bytes.into(), + ) + .sidecar(blob) + .into_transaction_request(), + AlignedProof::Risc0(proof) => { + let encoded_seal = encode_seal(&proof.receipt) + .map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())) + .map_err(RetryError::Transient)?; + proof_aggregation_service + .verifyAggregationRisc0( + blob_versioned_hash.into(), + encoded_seal.into(), + proof.receipt.journal.bytes.into(), + risc0_chunk_aggregator_image_id_bytes.into(), + ) + .sidecar(blob) + .into_transaction_request() + } + }; + + let provider = proof_aggregation_service.provider(); + let envelope = provider + .fill(tx_req) + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)? + .try_into_envelope() + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)?; + let tx: EthereumTxEnvelope> = envelope + .try_into_pooled() + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)? + .try_map_eip4844(|tx| { + tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get())) + }) + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)?; + + let encoded_tx = tx.encoded_2718(); + let pending_tx = provider + .send_raw_transaction(&encoded_tx) + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)?; + + let receipt = pending_tx + .get_receipt() + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)?; + + Ok(receipt) +} + +use backon::ExponentialBuilder; +use backon::Retryable; +use std::future::Future; + +#[derive(Debug)] +pub enum RetryError { + Transient(E), + Permanent(E), +} + +impl std::fmt::Display for RetryError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + RetryError::Transient(e) => write!(f, "{}", e), + RetryError::Permanent(e) => write!(f, "{}", e), + } + } +} + +impl RetryError { + pub fn inner(self) -> E { + match self { + RetryError::Transient(e) => e, + RetryError::Permanent(e) => e, + } + } +} + +impl std::error::Error for RetryError where E: std::fmt::Debug {} + +pub const ETHEREUM_CALL_MIN_RETRY_DELAY: u64 = 500; // milliseconds +pub const ETHEREUM_CALL_MAX_RETRIES: usize = 5; +pub const ETHEREUM_CALL_BACKOFF_FACTOR: f32 = 2.0; +pub const ETHEREUM_CALL_MAX_RETRY_DELAY: u64 = 60; // seconds + +/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function +/// Runs with `jitter: false`. +pub async fn retry_function( + function: FutureFn, + min_delay: u64, + factor: f32, + max_times: usize, + max_delay: u64, +) -> Result> +where + Fut: Future>>, + FutureFn: FnMut() -> Fut, +{ + let backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(min_delay)) + .with_max_times(max_times) + .with_factor(factor) + .with_max_delay(Duration::from_secs(max_delay)); + + function + .retry(backoff) + .sleep(tokio::time::sleep) + .when(|e| matches!(e, RetryError::Transient(_))) + .await +} + #[cfg(test)] mod tests { use super::*; From 62cb33b8f15dd6d169b8a64fa0bd122bdfc5707d Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 10:58:51 -0300 Subject: [PATCH 03/16] Include the bump part in the retryable function --- .../proof_aggregator/src/backend/mod.rs | 212 +++++++++--------- 1 file changed, 100 insertions(+), 112 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index e890962e7..c31f3b068 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -175,40 +175,12 @@ impl ProofAggregator { hex::encode(blob_versioned_hash) ); - // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed - // should be considered over a 24h period. - let mut time_elapsed = Duration::from_secs(24 * 3600); - - // Iterate until we can send the proof on-chain - loop { - // Fetch gas price from network - let gas_price = self - .rpc_provider - .get_gas_price() - .await - .map_err(|e| AggregatedProofSubmissionError::GasPriceError(e.to_string()))?; - - if Self::should_send_proof_to_verify_on_chain( - time_elapsed, - self.config.monthly_budget_eth, - U256::from(gas_price), - ) { - break; - } else { - info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints."); - } - - // Sleep for 3 minutes (15 blocks) before re-evaluating - let time_to_sleep = Duration::from_secs(180); - time_elapsed += time_to_sleep; - sleep(time_to_sleep); - } - - info!("Sending proof to ProofAggregationService contract..."); - - // Retry in case of failure let receipt = self - .send_proof_to_verify_on_chain_retryable(blob, blob_versioned_hash, aggregated_proof) + .bump_and_send_proof_to_verify_on_chain_retryable( + blob, + blob_versioned_hash, + aggregated_proof, + ) .await?; info!( "Proof sent and verified, tx hash {:?}", @@ -239,84 +211,35 @@ impl ProofAggregator { Ok(()) } - fn max_to_spend_in_wei(time_elapsed: Duration, monthly_eth_budget: f64) -> U256 { - const SECONDS_PER_MONTH: u64 = 30 * 24 * 60 * 60; - - // Note: this expect is safe because in case it was invalid, should have been caught at startup - let monthly_budget_in_wei = parse_ether(&monthly_eth_budget.to_string()) - .expect("The monthly budget should be a non-negative value"); - - let elapsed_seconds = U256::from(time_elapsed.as_secs()); - - let budget_available_per_second_in_wei = - monthly_budget_in_wei / U256::from(SECONDS_PER_MONTH); - - budget_available_per_second_in_wei * elapsed_seconds - } - - /// Decides whether to send the aggregated proof to be verified on-chain based on - /// time elapsed since last submission and monthly ETH budget. - /// We make a linear function with the eth to spend this month and the time elapsed since last submission. - /// If eth to spend / elapsed time is over the linear function, we skip the submission. - fn should_send_proof_to_verify_on_chain( - time_elapsed: Duration, - monthly_eth_budget: f64, - network_gas_price: U256, - ) -> bool { - // We assume a fixed gas cost of 300,000 for each of the 2 transactions - const ON_CHAIN_COST_IN_GAS_UNITS: u64 = 600_000u64; - - let on_chain_cost_in_gas: U256 = U256::from(ON_CHAIN_COST_IN_GAS_UNITS); - let max_to_spend_in_wei = Self::max_to_spend_in_wei(time_elapsed, monthly_eth_budget); - - let expected_cost_in_wei = network_gas_price * on_chain_cost_in_gas; - - expected_cost_in_wei <= max_to_spend_in_wei - } - - async fn send_proof_to_verify_on_chain_retryable( + async fn bump_and_send_proof_to_verify_on_chain_retryable( &self, blob: BlobTransactionSidecar, blob_versioned_hash: [u8; 32], aggregated_proof: AlignedProof, ) -> Result { - match send_proof_to_verify_on_chain( - blob.clone(), - blob_versioned_hash, - aggregated_proof.clone(), - self.proof_aggregation_service.clone(), - self.sp1_chunk_aggregator_vk_hash_bytes, - self.risc0_chunk_aggregator_image_id_bytes, + retry_function( + || { + bump_and_send_proof_to_verify_on_chain( + blob.clone(), + blob_versioned_hash, + aggregated_proof.clone(), + self.proof_aggregation_service.clone(), + self.sp1_chunk_aggregator_vk_hash_bytes, + self.risc0_chunk_aggregator_image_id_bytes, + self.rpc_provider.clone(), + self.config.monthly_budget_eth, + ) + }, + ETHEREUM_CALL_MIN_RETRY_DELAY, + ETHEREUM_CALL_BACKOFF_FACTOR, + ETHEREUM_CALL_MAX_RETRIES, + ETHEREUM_CALL_MAX_RETRY_DELAY, ) .await - { - Ok(tx_receipt) => Ok(tx_receipt), - Err(err) => { - tracing::error!("Failed to send proof to be verified on chain: {err:?}"); - - retry_function( - || { - send_proof_to_verify_on_chain( - blob.clone(), - blob_versioned_hash, - aggregated_proof.clone(), - self.proof_aggregation_service.clone(), - self.sp1_chunk_aggregator_vk_hash_bytes, - self.risc0_chunk_aggregator_image_id_bytes, - ) - }, - ETHEREUM_CALL_MIN_RETRY_DELAY, - ETHEREUM_CALL_BACKOFF_FACTOR, - ETHEREUM_CALL_MAX_RETRIES, - ETHEREUM_CALL_MAX_RETRY_DELAY, - ) - .await - .map_err(|e| { - error!("Could't get nonce: {:?}", e); - e.inner() - }) - } - } + .map_err(|e| { + error!("Could't get nonce: {:?}", e); + e.inner() + }) } /// ### Blob capacity @@ -392,14 +315,45 @@ impl ProofAggregator { } } -async fn send_proof_to_verify_on_chain( +async fn bump_and_send_proof_to_verify_on_chain( blob: BlobTransactionSidecar, blob_versioned_hash: [u8; 32], aggregated_proof: AlignedProof, proof_aggregation_service: AlignedProofAggregationServiceContract, sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], risc0_chunk_aggregator_image_id_bytes: [u8; 32], + rpc_provider: RPCProvider, + monthly_budget_eth: f64, ) -> Result> { + // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed + // should be considered over a 24h period. + let mut time_elapsed = Duration::from_secs(24 * 3600); + + // Iterate until we can send the proof on-chain + loop { + // Fetch gas price from network + let gas_price = rpc_provider.get_gas_price().await.map_err(|e| { + RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string())) + })?; + + if should_send_proof_to_verify_on_chain( + time_elapsed, + monthly_budget_eth, + U256::from(gas_price), + ) { + break; + } else { + info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints."); + } + + // Sleep for 3 minutes (15 blocks) before re-evaluating + let time_to_sleep = Duration::from_secs(180); + time_elapsed += time_to_sleep; + sleep(time_to_sleep); + } + + info!("Sending proof to ProofAggregationService contract..."); + let tx_req = match aggregated_proof { AlignedProof::SP1(proof) => proof_aggregation_service .verifyAggregationSP1( @@ -473,6 +427,40 @@ async fn send_proof_to_verify_on_chain( Ok(receipt) } +/// Decides whether to send the aggregated proof to be verified on-chain based on +/// time elapsed since last submission and monthly ETH budget. +/// We make a linear function with the eth to spend this month and the time elapsed since last submission. +/// If eth to spend / elapsed time is over the linear function, we skip the submission. +fn should_send_proof_to_verify_on_chain( + time_elapsed: Duration, + monthly_eth_budget: f64, + network_gas_price: U256, +) -> bool { + // We assume a fixed gas cost of 300,000 for each of the 2 transactions + const ON_CHAIN_COST_IN_GAS_UNITS: u64 = 600_000u64; + + let on_chain_cost_in_gas: U256 = U256::from(ON_CHAIN_COST_IN_GAS_UNITS); + let max_to_spend_in_wei = max_to_spend_in_wei(time_elapsed, monthly_eth_budget); + + let expected_cost_in_wei = network_gas_price * on_chain_cost_in_gas; + + expected_cost_in_wei <= max_to_spend_in_wei +} + +fn max_to_spend_in_wei(time_elapsed: Duration, monthly_eth_budget: f64) -> U256 { + const SECONDS_PER_MONTH: u64 = 30 * 24 * 60 * 60; + + // Note: this expect is safe because in case it was invalid, should have been caught at startup + let monthly_budget_in_wei = parse_ether(&monthly_eth_budget.to_string()) + .expect("The monthly budget should be a non-negative value"); + + let elapsed_seconds = U256::from(time_elapsed.as_secs()); + + let budget_available_per_second_in_wei = monthly_budget_in_wei / U256::from(SECONDS_PER_MONTH); + + budget_available_per_second_in_wei * elapsed_seconds +} + use backon::ExponentialBuilder; use backon::Retryable; use std::future::Future; @@ -554,7 +542,7 @@ mod tests { // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH // Expected cost < Max to spend, so we can send the proof - assert!(ProofAggregator::should_send_proof_to_verify_on_chain( + assert!(should_send_proof_to_verify_on_chain( Duration::from_secs(ONE_DAY_SECONDS), // 24 hours BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget gas_price, // 1 Gwei gas price @@ -567,7 +555,7 @@ mod tests { // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH // Expected cost: 600,000 * 8 Gwei = 0.0048 ETH // Expected cost < Max to spend, so we can send the proof - assert!(ProofAggregator::should_send_proof_to_verify_on_chain( + assert!(should_send_proof_to_verify_on_chain( Duration::from_secs(ONE_DAY_SECONDS), // 24 hours BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget U256::from(8_000_000_000u64), // 8 Gwei gas price @@ -580,7 +568,7 @@ mod tests { // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH // Expected cost: 600,000 * 10 Gwei = 0.006 ETH // Expected cost > Max to spend, so we cannot send the proof - assert!(!ProofAggregator::should_send_proof_to_verify_on_chain( + assert!(!should_send_proof_to_verify_on_chain( Duration::from_secs(ONE_DAY_SECONDS), // 24 hours BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget U256::from(10_000_000_000u64), // 10 Gwei gas price @@ -593,7 +581,7 @@ mod tests { // Max to spend: 0.000000058 ETH/hour * 3 hours = 0.000625 ETH // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH // Expected cost < Max to spend, so we can send the proof - assert!(ProofAggregator::should_send_proof_to_verify_on_chain( + assert!(should_send_proof_to_verify_on_chain( Duration::from_secs(3 * 3600), // 3 hours BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget gas_price, // 1 Gwei gas price @@ -606,7 +594,7 @@ mod tests { // Max to spend: 0.000000058 ETH/hour * 1.2 hours = 0.00025 ETH // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH // Expected cost > Max to spend, so we cannot send the proof - assert!(!ProofAggregator::should_send_proof_to_verify_on_chain( + assert!(!should_send_proof_to_verify_on_chain( Duration::from_secs_f64(1.2 * 3600.0), // 1.2 hours BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget gas_price, // 1 Gwei gas price @@ -619,7 +607,7 @@ mod tests { // Max to spend: 0.000000038 ETH/hour * 24 hours = 0.0032832 ETH // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH // Expected cost < Max to spend, so we can send the proof - assert!(ProofAggregator::should_send_proof_to_verify_on_chain( + assert!(should_send_proof_to_verify_on_chain( Duration::from_secs(ONE_DAY_SECONDS), // 24 hours 0.1, // 0.1 ETH monthly budget gas_price, // 1 Gwei gas price @@ -632,7 +620,7 @@ mod tests { // Max to spend: 0.0000000038 ETH/hour * 24 hours = 0.00032832 ETH // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH // Expected cost > Max to spend, so we cannot send the proof - assert!(!ProofAggregator::should_send_proof_to_verify_on_chain( + assert!(!should_send_proof_to_verify_on_chain( Duration::from_secs(ONE_DAY_SECONDS), // 24 hours 0.01, // 0.01 ETH monthly budget gas_price, // 1 Gwei gas price From aa9ddc5ed2619151cd95fcd3f995b5129e6fbc41 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 11:42:45 -0300 Subject: [PATCH 04/16] Remove unnecessary rpc provider param from Proof aggregator --- .../proof_aggregator/src/backend/mod.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index c31f3b068..6a87067d2 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -27,7 +27,7 @@ use sqlx::types::Uuid; use std::thread::sleep; use std::{str::FromStr, time::Duration}; use tracing::{error, info, warn}; -use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract, RPCProvider}; +use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract}; #[derive(Debug)] pub enum AggregatedProofSubmissionError { @@ -50,7 +50,6 @@ pub struct ProofAggregator { proof_aggregation_service: AlignedProofAggregationServiceContract, fetcher: ProofsFetcher, config: Config, - rpc_provider: RPCProvider, sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], risc0_chunk_aggregator_image_id_bytes: [u8; 32], db: Db, @@ -72,8 +71,6 @@ impl ProofAggregator { info!("Monthly budget set to {} eth", config.monthly_budget_eth); - let rpc_provider = ProviderBuilder::new().connect_http(rpc_url.clone()); - let signed_rpc_provider = ProviderBuilder::new().wallet(wallet).connect_http(rpc_url); let proof_aggregation_service = AlignedProofAggregationService::new( @@ -108,7 +105,6 @@ impl ProofAggregator { proof_aggregation_service, fetcher, config, - rpc_provider, sp1_chunk_aggregator_vk_hash_bytes, risc0_chunk_aggregator_image_id_bytes, db, @@ -226,7 +222,6 @@ impl ProofAggregator { self.proof_aggregation_service.clone(), self.sp1_chunk_aggregator_vk_hash_bytes, self.risc0_chunk_aggregator_image_id_bytes, - self.rpc_provider.clone(), self.config.monthly_budget_eth, ) }, @@ -322,7 +317,6 @@ async fn bump_and_send_proof_to_verify_on_chain( proof_aggregation_service: AlignedProofAggregationServiceContract, sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], risc0_chunk_aggregator_image_id_bytes: [u8; 32], - rpc_provider: RPCProvider, monthly_budget_eth: f64, ) -> Result> { // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed @@ -332,9 +326,13 @@ async fn bump_and_send_proof_to_verify_on_chain( // Iterate until we can send the proof on-chain loop { // Fetch gas price from network - let gas_price = rpc_provider.get_gas_price().await.map_err(|e| { - RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string())) - })?; + let gas_price = proof_aggregation_service + .provider() + .get_gas_price() + .await + .map_err(|e| { + RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string())) + })?; if should_send_proof_to_verify_on_chain( time_elapsed, From bca9ee184a9fd07505f0bca1a2e6ed743af5875a Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 11:46:32 -0300 Subject: [PATCH 05/16] Move the helpers and retry logic to separate files --- .../proof_aggregator/src/backend/helpers.rs | 36 ++++++ .../proof_aggregator/src/backend/mod.rs | 110 +++--------------- .../proof_aggregator/src/backend/retry.rs | 56 +++++++++ 3 files changed, 105 insertions(+), 97 deletions(-) create mode 100644 aggregation_mode/proof_aggregator/src/backend/helpers.rs create mode 100644 aggregation_mode/proof_aggregator/src/backend/retry.rs diff --git a/aggregation_mode/proof_aggregator/src/backend/helpers.rs b/aggregation_mode/proof_aggregator/src/backend/helpers.rs new file mode 100644 index 000000000..f8f3a40ac --- /dev/null +++ b/aggregation_mode/proof_aggregator/src/backend/helpers.rs @@ -0,0 +1,36 @@ +use alloy::primitives::{utils::parse_ether, U256}; +use std::time::Duration; + +/// Decides whether to send the aggregated proof to be verified on-chain based on +/// time elapsed since last submission and monthly ETH budget. +/// We make a linear function with the eth to spend this month and the time elapsed since last submission. +/// If eth to spend / elapsed time is over the linear function, we skip the submission. +pub fn should_send_proof_to_verify_on_chain( + time_elapsed: Duration, + monthly_eth_budget: f64, + network_gas_price: U256, +) -> bool { + // We assume a fixed gas cost of 300,000 for each of the 2 transactions + const ON_CHAIN_COST_IN_GAS_UNITS: u64 = 600_000u64; + + let on_chain_cost_in_gas: U256 = U256::from(ON_CHAIN_COST_IN_GAS_UNITS); + let max_to_spend_in_wei = max_to_spend_in_wei(time_elapsed, monthly_eth_budget); + + let expected_cost_in_wei = network_gas_price * on_chain_cost_in_gas; + + expected_cost_in_wei <= max_to_spend_in_wei +} + +fn max_to_spend_in_wei(time_elapsed: Duration, monthly_eth_budget: f64) -> U256 { + const SECONDS_PER_MONTH: u64 = 30 * 24 * 60 * 60; + + // Note: this expect is safe because in case it was invalid, should have been caught at startup + let monthly_budget_in_wei = parse_ether(&monthly_eth_budget.to_string()) + .expect("The monthly budget should be a non-negative value"); + + let elapsed_seconds = U256::from(time_elapsed.as_secs()); + + let budget_available_per_second_in_wei = monthly_budget_in_wei / U256::from(SECONDS_PER_MONTH); + + budget_available_per_second_in_wei * elapsed_seconds +} diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index 6a87067d2..c5ffb5211 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -1,14 +1,23 @@ pub mod config; mod db; pub mod fetcher; +mod helpers; mod merkle_tree; +mod retry; mod types; use crate::{ aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine}, - backend::db::{Db, DbError}, + backend::{ + db::{Db, DbError}, + retry::{retry_function, RetryError}, + }, }; +use aligned_sdk::common::constants::{ + ETHEREUM_CALL_BACKOFF_FACTOR, ETHEREUM_CALL_MAX_RETRIES, ETHEREUM_CALL_MAX_RETRY_DELAY, + ETHEREUM_CALL_MIN_RETRY_DELAY, +}; use alloy::{ consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar}, eips::{eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, Encodable2718}, @@ -334,7 +343,7 @@ async fn bump_and_send_proof_to_verify_on_chain( RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string())) })?; - if should_send_proof_to_verify_on_chain( + if helpers::should_send_proof_to_verify_on_chain( time_elapsed, monthly_budget_eth, U256::from(gas_price), @@ -425,105 +434,12 @@ async fn bump_and_send_proof_to_verify_on_chain( Ok(receipt) } -/// Decides whether to send the aggregated proof to be verified on-chain based on -/// time elapsed since last submission and monthly ETH budget. -/// We make a linear function with the eth to spend this month and the time elapsed since last submission. -/// If eth to spend / elapsed time is over the linear function, we skip the submission. -fn should_send_proof_to_verify_on_chain( - time_elapsed: Duration, - monthly_eth_budget: f64, - network_gas_price: U256, -) -> bool { - // We assume a fixed gas cost of 300,000 for each of the 2 transactions - const ON_CHAIN_COST_IN_GAS_UNITS: u64 = 600_000u64; - - let on_chain_cost_in_gas: U256 = U256::from(ON_CHAIN_COST_IN_GAS_UNITS); - let max_to_spend_in_wei = max_to_spend_in_wei(time_elapsed, monthly_eth_budget); - - let expected_cost_in_wei = network_gas_price * on_chain_cost_in_gas; - - expected_cost_in_wei <= max_to_spend_in_wei -} - -fn max_to_spend_in_wei(time_elapsed: Duration, monthly_eth_budget: f64) -> U256 { - const SECONDS_PER_MONTH: u64 = 30 * 24 * 60 * 60; - - // Note: this expect is safe because in case it was invalid, should have been caught at startup - let monthly_budget_in_wei = parse_ether(&monthly_eth_budget.to_string()) - .expect("The monthly budget should be a non-negative value"); - - let elapsed_seconds = U256::from(time_elapsed.as_secs()); - - let budget_available_per_second_in_wei = monthly_budget_in_wei / U256::from(SECONDS_PER_MONTH); - - budget_available_per_second_in_wei * elapsed_seconds -} - -use backon::ExponentialBuilder; -use backon::Retryable; -use std::future::Future; - -#[derive(Debug)] -pub enum RetryError { - Transient(E), - Permanent(E), -} - -impl std::fmt::Display for RetryError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - RetryError::Transient(e) => write!(f, "{}", e), - RetryError::Permanent(e) => write!(f, "{}", e), - } - } -} - -impl RetryError { - pub fn inner(self) -> E { - match self { - RetryError::Transient(e) => e, - RetryError::Permanent(e) => e, - } - } -} - -impl std::error::Error for RetryError where E: std::fmt::Debug {} - -pub const ETHEREUM_CALL_MIN_RETRY_DELAY: u64 = 500; // milliseconds -pub const ETHEREUM_CALL_MAX_RETRIES: usize = 5; -pub const ETHEREUM_CALL_BACKOFF_FACTOR: f32 = 2.0; -pub const ETHEREUM_CALL_MAX_RETRY_DELAY: u64 = 60; // seconds - -/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function -/// Runs with `jitter: false`. -pub async fn retry_function( - function: FutureFn, - min_delay: u64, - factor: f32, - max_times: usize, - max_delay: u64, -) -> Result> -where - Fut: Future>>, - FutureFn: FnMut() -> Fut, -{ - let backoff = ExponentialBuilder::default() - .with_min_delay(Duration::from_millis(min_delay)) - .with_max_times(max_times) - .with_factor(factor) - .with_max_delay(Duration::from_secs(max_delay)); - - function - .retry(backoff) - .sleep(tokio::time::sleep) - .when(|e| matches!(e, RetryError::Transient(_))) - .await -} - #[cfg(test)] mod tests { use super::*; + use helpers::should_send_proof_to_verify_on_chain; + #[test] fn test_should_send_proof_to_verify_on_chain_updated_cases() { // The should_send_proof_to_verify_on_chain function returns true when: diff --git a/aggregation_mode/proof_aggregator/src/backend/retry.rs b/aggregation_mode/proof_aggregator/src/backend/retry.rs new file mode 100644 index 000000000..cd41a10f1 --- /dev/null +++ b/aggregation_mode/proof_aggregator/src/backend/retry.rs @@ -0,0 +1,56 @@ +use backon::ExponentialBuilder; +use backon::Retryable; +use std::future::Future; +use std::time::Duration; + +#[derive(Debug)] +pub enum RetryError { + Transient(E), + // Permanent(E), +} + +impl std::fmt::Display for RetryError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + RetryError::Transient(e) => write!(f, "{e}"), + //RetryError::Permanent(e) => write!(f, "{e}"), + } + } +} + +impl RetryError { + pub fn inner(self) -> E { + match self { + RetryError::Transient(e) => e, + //RetryError::Permanent(e) => e, + } + } +} + +impl std::error::Error for RetryError where E: std::fmt::Debug {} + +/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function +/// Runs with `jitter: false`. +pub async fn retry_function( + function: FutureFn, + min_delay: u64, + factor: f32, + max_times: usize, + max_delay: u64, +) -> Result> +where + Fut: Future>>, + FutureFn: FnMut() -> Fut, +{ + let backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(min_delay)) + .with_max_times(max_times) + .with_factor(factor) + .with_max_delay(Duration::from_secs(max_delay)); + + function + .retry(backoff) + .sleep(tokio::time::sleep) + .when(|e| matches!(e, RetryError::Transient(_))) + .await +} From 5828919d870da405a7acdaf6a9d163159d0f25a0 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 12:04:20 -0300 Subject: [PATCH 06/16] Move the wait part to a separate function --- .../proof_aggregator/src/backend/mod.rs | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index c5ffb5211..6d8e2e557 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -224,7 +224,7 @@ impl ProofAggregator { ) -> Result { retry_function( || { - bump_and_send_proof_to_verify_on_chain( + wait_and_send_proof_to_verify_on_chain( blob.clone(), blob_versioned_hash, aggregated_proof.clone(), @@ -319,15 +319,10 @@ impl ProofAggregator { } } -async fn bump_and_send_proof_to_verify_on_chain( - blob: BlobTransactionSidecar, - blob_versioned_hash: [u8; 32], - aggregated_proof: AlignedProof, +async fn wait_until_can_submit_aggregated_proof( proof_aggregation_service: AlignedProofAggregationServiceContract, - sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], - risc0_chunk_aggregator_image_id_bytes: [u8; 32], monthly_budget_eth: f64, -) -> Result> { +) -> Result<(), RetryError> { // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed // should be considered over a 24h period. let mut time_elapsed = Duration::from_secs(24 * 3600); @@ -359,6 +354,21 @@ async fn bump_and_send_proof_to_verify_on_chain( sleep(time_to_sleep); } + Ok(()) +} + +async fn wait_and_send_proof_to_verify_on_chain( + blob: BlobTransactionSidecar, + blob_versioned_hash: [u8; 32], + aggregated_proof: AlignedProof, + proof_aggregation_service: AlignedProofAggregationServiceContract, + sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], + risc0_chunk_aggregator_image_id_bytes: [u8; 32], + monthly_budget_eth: f64, +) -> Result> { + wait_until_can_submit_aggregated_proof(proof_aggregation_service.clone(), monthly_budget_eth) + .await?; + info!("Sending proof to ProofAggregationService contract..."); let tx_req = match aggregated_proof { From f89957f2bcaa5a9764510fb8b8962b3a2e01ee50 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 12:13:22 -0300 Subject: [PATCH 07/16] Move the retryable function to the retry module --- .../proof_aggregator/src/backend/mod.rs | 142 ++---------------- .../proof_aggregator/src/backend/retry.rs | 141 +++++++++++++++++ .../proof_aggregator/src/backend/types.rs | 17 --- 3 files changed, 150 insertions(+), 150 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index 6d8e2e557..bc0931949 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -10,7 +10,7 @@ use crate::{ aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine}, backend::{ db::{Db, DbError}, - retry::{retry_function, RetryError}, + retry::{retry_function, wait_and_send_proof_to_verify_on_chain}, }, }; @@ -19,22 +19,20 @@ use aligned_sdk::common::constants::{ ETHEREUM_CALL_MIN_RETRY_DELAY, }; use alloy::{ - consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar}, - eips::{eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, Encodable2718}, + consensus::BlobTransactionSidecar, + eips::eip4844::BYTES_PER_BLOB, hex, network::EthereumWallet, - primitives::{utils::parse_ether, Address, U256}, - providers::{PendingTransactionError, Provider, ProviderBuilder}, + primitives::{utils::parse_ether, Address}, + providers::{PendingTransactionError, ProviderBuilder}, rpc::types::TransactionReceipt, signers::local::LocalSigner, }; use config::Config; use fetcher::{ProofsFetcher, ProofsFetcherError}; use merkle_tree::compute_proofs_merkle_root; -use risc0_ethereum_contracts::encode_seal; use sqlx::types::Uuid; -use std::thread::sleep; -use std::{str::FromStr, time::Duration}; +use std::str::FromStr; use tracing::{error, info, warn}; use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract}; @@ -319,136 +317,14 @@ impl ProofAggregator { } } -async fn wait_until_can_submit_aggregated_proof( - proof_aggregation_service: AlignedProofAggregationServiceContract, - monthly_budget_eth: f64, -) -> Result<(), RetryError> { - // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed - // should be considered over a 24h period. - let mut time_elapsed = Duration::from_secs(24 * 3600); - - // Iterate until we can send the proof on-chain - loop { - // Fetch gas price from network - let gas_price = proof_aggregation_service - .provider() - .get_gas_price() - .await - .map_err(|e| { - RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string())) - })?; - - if helpers::should_send_proof_to_verify_on_chain( - time_elapsed, - monthly_budget_eth, - U256::from(gas_price), - ) { - break; - } else { - info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints."); - } - - // Sleep for 3 minutes (15 blocks) before re-evaluating - let time_to_sleep = Duration::from_secs(180); - time_elapsed += time_to_sleep; - sleep(time_to_sleep); - } - - Ok(()) -} - -async fn wait_and_send_proof_to_verify_on_chain( - blob: BlobTransactionSidecar, - blob_versioned_hash: [u8; 32], - aggregated_proof: AlignedProof, - proof_aggregation_service: AlignedProofAggregationServiceContract, - sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], - risc0_chunk_aggregator_image_id_bytes: [u8; 32], - monthly_budget_eth: f64, -) -> Result> { - wait_until_can_submit_aggregated_proof(proof_aggregation_service.clone(), monthly_budget_eth) - .await?; - - info!("Sending proof to ProofAggregationService contract..."); - - let tx_req = match aggregated_proof { - AlignedProof::SP1(proof) => proof_aggregation_service - .verifyAggregationSP1( - blob_versioned_hash.into(), - proof.proof_with_pub_values.public_values.to_vec().into(), - proof.proof_with_pub_values.bytes().into(), - sp1_chunk_aggregator_vk_hash_bytes.into(), - ) - .sidecar(blob) - .into_transaction_request(), - AlignedProof::Risc0(proof) => { - let encoded_seal = encode_seal(&proof.receipt) - .map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())) - .map_err(RetryError::Transient)?; - proof_aggregation_service - .verifyAggregationRisc0( - blob_versioned_hash.into(), - encoded_seal.into(), - proof.receipt.journal.bytes.into(), - risc0_chunk_aggregator_image_id_bytes.into(), - ) - .sidecar(blob) - .into_transaction_request() - } - }; - - let provider = proof_aggregation_service.provider(); - let envelope = provider - .fill(tx_req) - .await - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)? - .try_into_envelope() - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)?; - let tx: EthereumTxEnvelope> = envelope - .try_into_pooled() - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)? - .try_map_eip4844(|tx| { - tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get())) - }) - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)?; - - let encoded_tx = tx.encoded_2718(); - let pending_tx = provider - .send_raw_transaction(&encoded_tx) - .await - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)?; - - let receipt = pending_tx - .get_receipt() - .await - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)?; - - Ok(receipt) -} - #[cfg(test)] mod tests { + use super::*; + use alloy::primitives::U256; use helpers::should_send_proof_to_verify_on_chain; + use std::time::Duration; #[test] fn test_should_send_proof_to_verify_on_chain_updated_cases() { diff --git a/aggregation_mode/proof_aggregator/src/backend/retry.rs b/aggregation_mode/proof_aggregator/src/backend/retry.rs index cd41a10f1..ce1172944 100644 --- a/aggregation_mode/proof_aggregator/src/backend/retry.rs +++ b/aggregation_mode/proof_aggregator/src/backend/retry.rs @@ -2,6 +2,22 @@ use backon::ExponentialBuilder; use backon::Retryable; use std::future::Future; use std::time::Duration; +use tracing::info; + +use crate::aggregators::AlignedProof; +use crate::backend::types::AlignedProofAggregationServiceContract; +use crate::backend::AggregatedProofSubmissionError; + +use crate::backend::helpers; +use alloy::{ + consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar}, + eips::{eip7594::BlobTransactionSidecarEip7594, Encodable2718}, + primitives::U256, + providers::Provider, + rpc::types::TransactionReceipt, +}; +use risc0_ethereum_contracts::encode_seal; +use std::thread::sleep; #[derive(Debug)] pub enum RetryError { @@ -54,3 +70,128 @@ where .when(|e| matches!(e, RetryError::Transient(_))) .await } + +async fn wait_until_can_submit_aggregated_proof( + proof_aggregation_service: AlignedProofAggregationServiceContract, + monthly_budget_eth: f64, +) -> Result<(), RetryError> { + // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed + // should be considered over a 24h period. + let mut time_elapsed = Duration::from_secs(24 * 3600); + + // Iterate until we can send the proof on-chain + loop { + // Fetch gas price from network + let gas_price = proof_aggregation_service + .provider() + .get_gas_price() + .await + .map_err(|e| { + RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string())) + })?; + + if helpers::should_send_proof_to_verify_on_chain( + time_elapsed, + monthly_budget_eth, + U256::from(gas_price), + ) { + break; + } else { + info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints."); + } + + // Sleep for 3 minutes (15 blocks) before re-evaluating + let time_to_sleep = Duration::from_secs(180); + time_elapsed += time_to_sleep; + sleep(time_to_sleep); + } + + Ok(()) +} + +pub async fn wait_and_send_proof_to_verify_on_chain( + blob: BlobTransactionSidecar, + blob_versioned_hash: [u8; 32], + aggregated_proof: AlignedProof, + proof_aggregation_service: AlignedProofAggregationServiceContract, + sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], + risc0_chunk_aggregator_image_id_bytes: [u8; 32], + monthly_budget_eth: f64, +) -> Result> { + wait_until_can_submit_aggregated_proof(proof_aggregation_service.clone(), monthly_budget_eth) + .await?; + + info!("Sending proof to ProofAggregationService contract..."); + + let tx_req = match aggregated_proof { + AlignedProof::SP1(proof) => proof_aggregation_service + .verifyAggregationSP1( + blob_versioned_hash.into(), + proof.proof_with_pub_values.public_values.to_vec().into(), + proof.proof_with_pub_values.bytes().into(), + sp1_chunk_aggregator_vk_hash_bytes.into(), + ) + .sidecar(blob) + .into_transaction_request(), + AlignedProof::Risc0(proof) => { + let encoded_seal = encode_seal(&proof.receipt) + .map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())) + .map_err(RetryError::Transient)?; + proof_aggregation_service + .verifyAggregationRisc0( + blob_versioned_hash.into(), + encoded_seal.into(), + proof.receipt.journal.bytes.into(), + risc0_chunk_aggregator_image_id_bytes.into(), + ) + .sidecar(blob) + .into_transaction_request() + } + }; + + let provider = proof_aggregation_service.provider(); + let envelope = provider + .fill(tx_req) + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)? + .try_into_envelope() + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)?; + let tx: EthereumTxEnvelope> = envelope + .try_into_pooled() + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)? + .try_map_eip4844(|tx| { + tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get())) + }) + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)?; + + let encoded_tx = tx.encoded_2718(); + let pending_tx = provider + .send_raw_transaction(&encoded_tx) + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)?; + + let receipt = pending_tx + .get_receipt() + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) + }) + .map_err(RetryError::Transient)?; + + Ok(receipt) +} diff --git a/aggregation_mode/proof_aggregator/src/backend/types.rs b/aggregation_mode/proof_aggregator/src/backend/types.rs index 8d6a92d9e..7f10e803d 100644 --- a/aggregation_mode/proof_aggregator/src/backend/types.rs +++ b/aggregation_mode/proof_aggregator/src/backend/types.rs @@ -30,20 +30,3 @@ pub type AlignedProofAggregationServiceContract = AlignedProofAggregationService RootProvider, >, >; - -pub type RPCProvider = alloy::providers::fillers::FillProvider< - alloy::providers::fillers::JoinFill< - alloy::providers::Identity, - alloy::providers::fillers::JoinFill< - alloy::providers::fillers::GasFiller, - alloy::providers::fillers::JoinFill< - alloy::providers::fillers::BlobGasFiller, - alloy::providers::fillers::JoinFill< - alloy::providers::fillers::NonceFiller, - alloy::providers::fillers::ChainIdFiller, - >, - >, - >, - >, - alloy::providers::RootProvider, ->; From fe32da1983a4a5e03eb283f1321e50c80a0291bd Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 13:42:05 -0300 Subject: [PATCH 08/16] Remove unnecessary derive tags --- aggregation_mode/proof_aggregator/src/aggregators/mod.rs | 1 - .../proof_aggregator/src/aggregators/risc0_aggregator.rs | 1 - .../proof_aggregator/src/aggregators/sp1_aggregator.rs | 1 - aggregation_mode/proof_aggregator/src/backend/mod.rs | 2 +- aggregation_mode/proof_aggregator/src/backend/retry.rs | 4 ++-- 5 files changed, 3 insertions(+), 6 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/aggregators/mod.rs b/aggregation_mode/proof_aggregator/src/aggregators/mod.rs index 18fe1b5f1..07e4211b8 100644 --- a/aggregation_mode/proof_aggregator/src/aggregators/mod.rs +++ b/aggregation_mode/proof_aggregator/src/aggregators/mod.rs @@ -161,7 +161,6 @@ impl ZKVMEngine { } } -#[derive(Clone)] pub enum AlignedProof { SP1(Box), Risc0(Box), diff --git a/aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs b/aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs index bceca4193..45fcb6526 100644 --- a/aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs +++ b/aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs @@ -4,7 +4,6 @@ use aligned_sdk::aggregation_layer::AggregationModeProvingSystem; use risc0_zkvm::{default_prover, ExecutorEnv, ProverOpts, Receipt}; use sha3::{Digest, Keccak256}; -#[derive(Clone)] pub struct Risc0ProofReceiptAndImageId { pub image_id: [u8; 32], pub receipt: Receipt, diff --git a/aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs b/aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs index 2ead623b0..d4e8df8b4 100644 --- a/aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs +++ b/aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs @@ -25,7 +25,6 @@ static SP1_PROVER_CLIENT: LazyLock = LazyLock::new(ProverClient::from static SP1_PROVER_CLIENT_CPU: LazyLock = LazyLock::new(|| ProverClient::builder().cpu().build()); -#[derive(Clone)] pub struct SP1ProofWithPubValuesAndVk { pub proof_with_pub_values: SP1ProofWithPublicValues, pub vk: SP1VerifyingKey, diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index bc0931949..ca9bdd21c 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -225,7 +225,7 @@ impl ProofAggregator { wait_and_send_proof_to_verify_on_chain( blob.clone(), blob_versioned_hash, - aggregated_proof.clone(), + &aggregated_proof, self.proof_aggregation_service.clone(), self.sp1_chunk_aggregator_vk_hash_bytes, self.risc0_chunk_aggregator_image_id_bytes, diff --git a/aggregation_mode/proof_aggregator/src/backend/retry.rs b/aggregation_mode/proof_aggregator/src/backend/retry.rs index ce1172944..79e2dc217 100644 --- a/aggregation_mode/proof_aggregator/src/backend/retry.rs +++ b/aggregation_mode/proof_aggregator/src/backend/retry.rs @@ -112,7 +112,7 @@ async fn wait_until_can_submit_aggregated_proof( pub async fn wait_and_send_proof_to_verify_on_chain( blob: BlobTransactionSidecar, blob_versioned_hash: [u8; 32], - aggregated_proof: AlignedProof, + aggregated_proof: &AlignedProof, proof_aggregation_service: AlignedProofAggregationServiceContract, sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], risc0_chunk_aggregator_image_id_bytes: [u8; 32], @@ -141,7 +141,7 @@ pub async fn wait_and_send_proof_to_verify_on_chain( .verifyAggregationRisc0( blob_versioned_hash.into(), encoded_seal.into(), - proof.receipt.journal.bytes.into(), + proof.receipt.journal.bytes.clone().into(), risc0_chunk_aggregator_image_id_bytes.into(), ) .sidecar(blob) From 54f6fea035fa15fa8f28aa0312c040a767f8bf8e Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 14:44:19 -0300 Subject: [PATCH 09/16] Use tokio sleep instead of std sleep --- aggregation_mode/proof_aggregator/src/backend/retry.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/retry.rs b/aggregation_mode/proof_aggregator/src/backend/retry.rs index 79e2dc217..034336d0d 100644 --- a/aggregation_mode/proof_aggregator/src/backend/retry.rs +++ b/aggregation_mode/proof_aggregator/src/backend/retry.rs @@ -17,7 +17,7 @@ use alloy::{ rpc::types::TransactionReceipt, }; use risc0_ethereum_contracts::encode_seal; -use std::thread::sleep; +use tokio::time::sleep; #[derive(Debug)] pub enum RetryError { @@ -79,6 +79,9 @@ async fn wait_until_can_submit_aggregated_proof( // should be considered over a 24h period. let mut time_elapsed = Duration::from_secs(24 * 3600); + // Sleep for 3 minutes (15 blocks) before re-evaluating on each iteration + let time_to_sleep = Duration::from_secs(180); + // Iterate until we can send the proof on-chain loop { // Fetch gas price from network @@ -100,10 +103,8 @@ async fn wait_until_can_submit_aggregated_proof( info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints."); } - // Sleep for 3 minutes (15 blocks) before re-evaluating - let time_to_sleep = Duration::from_secs(180); time_elapsed += time_to_sleep; - sleep(time_to_sleep); + sleep(time_to_sleep).await; } Ok(()) From 68e7c5bd8507295bd9a7fce7027115aa4995b193 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 14:45:09 -0300 Subject: [PATCH 10/16] Minor style changes --- .../proof_aggregator/src/backend/helpers.rs | 10 +++++----- aggregation_mode/proof_aggregator/src/backend/mod.rs | 10 +++------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/helpers.rs b/aggregation_mode/proof_aggregator/src/backend/helpers.rs index f8f3a40ac..f23102a76 100644 --- a/aggregation_mode/proof_aggregator/src/backend/helpers.rs +++ b/aggregation_mode/proof_aggregator/src/backend/helpers.rs @@ -1,6 +1,9 @@ use alloy::primitives::{utils::parse_ether, U256}; use std::time::Duration; +// We assume a fixed gas cost of 300,000 for each of the 2 transactions +const ON_CHAIN_COST_IN_GAS_UNITS: u64 = 600_000u64; + /// Decides whether to send the aggregated proof to be verified on-chain based on /// time elapsed since last submission and monthly ETH budget. /// We make a linear function with the eth to spend this month and the time elapsed since last submission. @@ -10,15 +13,12 @@ pub fn should_send_proof_to_verify_on_chain( monthly_eth_budget: f64, network_gas_price: U256, ) -> bool { - // We assume a fixed gas cost of 300,000 for each of the 2 transactions - const ON_CHAIN_COST_IN_GAS_UNITS: u64 = 600_000u64; - let on_chain_cost_in_gas: U256 = U256::from(ON_CHAIN_COST_IN_GAS_UNITS); - let max_to_spend_in_wei = max_to_spend_in_wei(time_elapsed, monthly_eth_budget); + let max_to_spend_wei = max_to_spend_in_wei(time_elapsed, monthly_eth_budget); let expected_cost_in_wei = network_gas_price * on_chain_cost_in_gas; - expected_cost_in_wei <= max_to_spend_in_wei + expected_cost_in_wei <= max_to_spend_wei } fn max_to_spend_in_wei(time_elapsed: Duration, monthly_eth_budget: f64) -> U256 { diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index ca9bdd21c..fe95a472b 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -179,11 +179,7 @@ impl ProofAggregator { ); let receipt = self - .bump_and_send_proof_to_verify_on_chain_retryable( - blob, - blob_versioned_hash, - aggregated_proof, - ) + .wait_and_send_proof_on_chain_retryable(blob, blob_versioned_hash, aggregated_proof) .await?; info!( "Proof sent and verified, tx hash {:?}", @@ -214,7 +210,7 @@ impl ProofAggregator { Ok(()) } - async fn bump_and_send_proof_to_verify_on_chain_retryable( + async fn wait_and_send_proof_on_chain_retryable( &self, blob: BlobTransactionSidecar, blob_versioned_hash: [u8; 32], @@ -239,7 +235,7 @@ impl ProofAggregator { ) .await .map_err(|e| { - error!("Could't get nonce: {:?}", e); + error!("Couldn't get nonce: {:?}", e); e.inner() }) } From 5349bcc77ebe076391d3db28933c0b573929bfa7 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 15:24:39 -0300 Subject: [PATCH 11/16] Use permanent retry case --- aggregation_mode/proof_aggregator/src/backend/retry.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/retry.rs b/aggregation_mode/proof_aggregator/src/backend/retry.rs index 034336d0d..cf425b668 100644 --- a/aggregation_mode/proof_aggregator/src/backend/retry.rs +++ b/aggregation_mode/proof_aggregator/src/backend/retry.rs @@ -22,14 +22,14 @@ use tokio::time::sleep; #[derive(Debug)] pub enum RetryError { Transient(E), - // Permanent(E), + Permanent(E), } impl std::fmt::Display for RetryError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { RetryError::Transient(e) => write!(f, "{e}"), - //RetryError::Permanent(e) => write!(f, "{e}"), + RetryError::Permanent(e) => write!(f, "{e}"), } } } @@ -38,7 +38,7 @@ impl RetryError { pub fn inner(self) -> E { match self { RetryError::Transient(e) => e, - //RetryError::Permanent(e) => e, + RetryError::Permanent(e) => e, } } } @@ -137,7 +137,7 @@ pub async fn wait_and_send_proof_to_verify_on_chain( AlignedProof::Risc0(proof) => { let encoded_seal = encode_seal(&proof.receipt) .map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())) - .map_err(RetryError::Transient)?; + .map_err(RetryError::Permanent)?; proof_aggregation_service .verifyAggregationRisc0( blob_versioned_hash.into(), From 5587f2d3b0edd43fae7f1c831756f64de4271de7 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Thu, 18 Dec 2025 18:39:34 -0300 Subject: [PATCH 12/16] Add logs to track the retries --- aggregation_mode/proof_aggregator/src/backend/retry.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/aggregation_mode/proof_aggregator/src/backend/retry.rs b/aggregation_mode/proof_aggregator/src/backend/retry.rs index cf425b668..df3dafed5 100644 --- a/aggregation_mode/proof_aggregator/src/backend/retry.rs +++ b/aggregation_mode/proof_aggregator/src/backend/retry.rs @@ -75,6 +75,8 @@ async fn wait_until_can_submit_aggregated_proof( proof_aggregation_service: AlignedProofAggregationServiceContract, monthly_budget_eth: f64, ) -> Result<(), RetryError> { + info!("Started waiting until we can submit the aggregated proof."); + // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed // should be considered over a 24h period. let mut time_elapsed = Duration::from_secs(24 * 3600); @@ -93,6 +95,8 @@ async fn wait_until_can_submit_aggregated_proof( RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string())) })?; + info!("Fetched gas price from network: {gas_price}"); + if helpers::should_send_proof_to_verify_on_chain( time_elapsed, monthly_budget_eth, From 5a728d1b25cbbb0491f527c1fee1520da4727038 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Mon, 22 Dec 2025 15:00:50 -0300 Subject: [PATCH 13/16] Move the retryables functions back to the ProofAggregatorStruct --- .../proof_aggregator/src/backend/mod.rs | 168 +++++++++++++++++- .../proof_aggregator/src/backend/retry.rs | 146 --------------- 2 files changed, 159 insertions(+), 155 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index fe95a472b..d351a0de8 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -10,7 +10,8 @@ use crate::{ aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine}, backend::{ db::{Db, DbError}, - retry::{retry_function, wait_and_send_proof_to_verify_on_chain}, + retry::{retry_function, RetryError}, + types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract}, }, }; @@ -19,22 +20,24 @@ use aligned_sdk::common::constants::{ ETHEREUM_CALL_MIN_RETRY_DELAY, }; use alloy::{ - consensus::BlobTransactionSidecar, - eips::eip4844::BYTES_PER_BLOB, + consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar}, + eips::{eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, Encodable2718}, hex, network::EthereumWallet, - primitives::{utils::parse_ether, Address}, - providers::{PendingTransactionError, ProviderBuilder}, + primitives::{utils::parse_ether, Address, U256}, + providers::{PendingTransactionError, Provider, ProviderBuilder}, rpc::types::TransactionReceipt, signers::local::LocalSigner, }; use config::Config; use fetcher::{ProofsFetcher, ProofsFetcherError}; use merkle_tree::compute_proofs_merkle_root; +use risc0_ethereum_contracts::encode_seal; use sqlx::types::Uuid; -use std::str::FromStr; -use tracing::{error, info, warn}; -use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract}; +use std::{str::FromStr, time::Duration}; +use tokio::time::sleep; +use tracing::info; +use tracing::{error, warn}; #[derive(Debug)] pub enum AggregatedProofSubmissionError { @@ -218,7 +221,7 @@ impl ProofAggregator { ) -> Result { retry_function( || { - wait_and_send_proof_to_verify_on_chain( + Self::wait_and_send_proof_to_verify_on_chain( blob.clone(), blob_versioned_hash, &aggregated_proof, @@ -311,6 +314,153 @@ impl ProofAggregator { Ok((blob, blob_versioned_hash)) } + + async fn wait_until_can_submit_aggregated_proof( + proof_aggregation_service: AlignedProofAggregationServiceContract, + monthly_budget_eth: f64, + ) -> Result<(), RetryError> { + info!("Started waiting until we can submit the aggregated proof."); + + // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed + // should be considered over a 24h period. + let mut time_elapsed = Duration::from_secs(24 * 3600); + + // Sleep for 3 minutes (15 blocks) before re-evaluating on each iteration + let time_to_sleep = Duration::from_secs(180); + + // Iterate until we can send the proof on-chain + loop { + // Fetch gas price from network + let gas_price = proof_aggregation_service + .provider() + .get_gas_price() + .await + .map_err(|e| { + RetryError::Transient(AggregatedProofSubmissionError::GasPriceError( + e.to_string(), + )) + })?; + + info!("Fetched gas price from network: {gas_price}"); + + if helpers::should_send_proof_to_verify_on_chain( + time_elapsed, + monthly_budget_eth, + U256::from(gas_price), + ) { + break; + } else { + info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints."); + } + + time_elapsed += time_to_sleep; + sleep(time_to_sleep).await; + } + + Ok(()) + } + + pub async fn wait_and_send_proof_to_verify_on_chain( + blob: BlobTransactionSidecar, + blob_versioned_hash: [u8; 32], + aggregated_proof: &AlignedProof, + proof_aggregation_service: AlignedProofAggregationServiceContract, + sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], + risc0_chunk_aggregator_image_id_bytes: [u8; 32], + monthly_budget_eth: f64, + ) -> Result> { + Self::wait_until_can_submit_aggregated_proof( + proof_aggregation_service.clone(), + monthly_budget_eth, + ) + .await?; + + info!("Sending proof to ProofAggregationService contract..."); + + let tx_req = match aggregated_proof { + AlignedProof::SP1(proof) => proof_aggregation_service + .verifyAggregationSP1( + blob_versioned_hash.into(), + proof.proof_with_pub_values.public_values.to_vec().into(), + proof.proof_with_pub_values.bytes().into(), + sp1_chunk_aggregator_vk_hash_bytes.into(), + ) + .sidecar(blob) + .into_transaction_request(), + AlignedProof::Risc0(proof) => { + let encoded_seal = encode_seal(&proof.receipt) + .map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())) + .map_err(RetryError::Permanent)?; + proof_aggregation_service + .verifyAggregationRisc0( + blob_versioned_hash.into(), + encoded_seal.into(), + proof.receipt.journal.bytes.clone().into(), + risc0_chunk_aggregator_image_id_bytes.into(), + ) + .sidecar(blob) + .into_transaction_request() + } + }; + + let provider = proof_aggregation_service.provider(); + let envelope = provider + .fill(tx_req) + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( + err.to_string(), + ) + }) + .map_err(RetryError::Transient)? + .try_into_envelope() + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( + err.to_string(), + ) + }) + .map_err(RetryError::Transient)?; + let tx: EthereumTxEnvelope> = envelope + .try_into_pooled() + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( + err.to_string(), + ) + }) + .map_err(RetryError::Transient)? + .try_map_eip4844(|tx| { + tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get())) + }) + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( + err.to_string(), + ) + }) + .map_err(RetryError::Transient)?; + + let encoded_tx = tx.encoded_2718(); + let pending_tx = provider + .send_raw_transaction(&encoded_tx) + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( + err.to_string(), + ) + }) + .map_err(RetryError::Transient)?; + + let receipt = pending_tx + .get_receipt() + .await + .map_err(|err| { + AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction( + err.to_string(), + ) + }) + .map_err(RetryError::Transient)?; + + Ok(receipt) + } } #[cfg(test)] diff --git a/aggregation_mode/proof_aggregator/src/backend/retry.rs b/aggregation_mode/proof_aggregator/src/backend/retry.rs index df3dafed5..6e3a454a6 100644 --- a/aggregation_mode/proof_aggregator/src/backend/retry.rs +++ b/aggregation_mode/proof_aggregator/src/backend/retry.rs @@ -2,22 +2,6 @@ use backon::ExponentialBuilder; use backon::Retryable; use std::future::Future; use std::time::Duration; -use tracing::info; - -use crate::aggregators::AlignedProof; -use crate::backend::types::AlignedProofAggregationServiceContract; -use crate::backend::AggregatedProofSubmissionError; - -use crate::backend::helpers; -use alloy::{ - consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar}, - eips::{eip7594::BlobTransactionSidecarEip7594, Encodable2718}, - primitives::U256, - providers::Provider, - rpc::types::TransactionReceipt, -}; -use risc0_ethereum_contracts::encode_seal; -use tokio::time::sleep; #[derive(Debug)] pub enum RetryError { @@ -70,133 +54,3 @@ where .when(|e| matches!(e, RetryError::Transient(_))) .await } - -async fn wait_until_can_submit_aggregated_proof( - proof_aggregation_service: AlignedProofAggregationServiceContract, - monthly_budget_eth: f64, -) -> Result<(), RetryError> { - info!("Started waiting until we can submit the aggregated proof."); - - // We start on 24 hours because the proof aggregator runs once a day, so the time elapsed - // should be considered over a 24h period. - let mut time_elapsed = Duration::from_secs(24 * 3600); - - // Sleep for 3 minutes (15 blocks) before re-evaluating on each iteration - let time_to_sleep = Duration::from_secs(180); - - // Iterate until we can send the proof on-chain - loop { - // Fetch gas price from network - let gas_price = proof_aggregation_service - .provider() - .get_gas_price() - .await - .map_err(|e| { - RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string())) - })?; - - info!("Fetched gas price from network: {gas_price}"); - - if helpers::should_send_proof_to_verify_on_chain( - time_elapsed, - monthly_budget_eth, - U256::from(gas_price), - ) { - break; - } else { - info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints."); - } - - time_elapsed += time_to_sleep; - sleep(time_to_sleep).await; - } - - Ok(()) -} - -pub async fn wait_and_send_proof_to_verify_on_chain( - blob: BlobTransactionSidecar, - blob_versioned_hash: [u8; 32], - aggregated_proof: &AlignedProof, - proof_aggregation_service: AlignedProofAggregationServiceContract, - sp1_chunk_aggregator_vk_hash_bytes: [u8; 32], - risc0_chunk_aggregator_image_id_bytes: [u8; 32], - monthly_budget_eth: f64, -) -> Result> { - wait_until_can_submit_aggregated_proof(proof_aggregation_service.clone(), monthly_budget_eth) - .await?; - - info!("Sending proof to ProofAggregationService contract..."); - - let tx_req = match aggregated_proof { - AlignedProof::SP1(proof) => proof_aggregation_service - .verifyAggregationSP1( - blob_versioned_hash.into(), - proof.proof_with_pub_values.public_values.to_vec().into(), - proof.proof_with_pub_values.bytes().into(), - sp1_chunk_aggregator_vk_hash_bytes.into(), - ) - .sidecar(blob) - .into_transaction_request(), - AlignedProof::Risc0(proof) => { - let encoded_seal = encode_seal(&proof.receipt) - .map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())) - .map_err(RetryError::Permanent)?; - proof_aggregation_service - .verifyAggregationRisc0( - blob_versioned_hash.into(), - encoded_seal.into(), - proof.receipt.journal.bytes.clone().into(), - risc0_chunk_aggregator_image_id_bytes.into(), - ) - .sidecar(blob) - .into_transaction_request() - } - }; - - let provider = proof_aggregation_service.provider(); - let envelope = provider - .fill(tx_req) - .await - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)? - .try_into_envelope() - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)?; - let tx: EthereumTxEnvelope> = envelope - .try_into_pooled() - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)? - .try_map_eip4844(|tx| { - tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get())) - }) - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)?; - - let encoded_tx = tx.encoded_2718(); - let pending_tx = provider - .send_raw_transaction(&encoded_tx) - .await - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)?; - - let receipt = pending_tx - .get_receipt() - .await - .map_err(|err| { - AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string()) - }) - .map_err(RetryError::Transient)?; - - Ok(receipt) -} From c380944e139c52ee7aadc9667dc82750aa90a04c Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Mon, 22 Dec 2025 15:03:01 -0300 Subject: [PATCH 14/16] rename the helpers module to eth --- .../proof_aggregator/src/backend/{helpers.rs => eth.rs} | 0 aggregation_mode/proof_aggregator/src/backend/mod.rs | 6 +++--- 2 files changed, 3 insertions(+), 3 deletions(-) rename aggregation_mode/proof_aggregator/src/backend/{helpers.rs => eth.rs} (100%) diff --git a/aggregation_mode/proof_aggregator/src/backend/helpers.rs b/aggregation_mode/proof_aggregator/src/backend/eth.rs similarity index 100% rename from aggregation_mode/proof_aggregator/src/backend/helpers.rs rename to aggregation_mode/proof_aggregator/src/backend/eth.rs diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index d351a0de8..1136fffa5 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -1,7 +1,7 @@ pub mod config; mod db; +mod eth; pub mod fetcher; -mod helpers; mod merkle_tree; mod retry; mod types; @@ -343,7 +343,7 @@ impl ProofAggregator { info!("Fetched gas price from network: {gas_price}"); - if helpers::should_send_proof_to_verify_on_chain( + if eth::should_send_proof_to_verify_on_chain( time_elapsed, monthly_budget_eth, U256::from(gas_price), @@ -469,7 +469,7 @@ mod tests { use super::*; use alloy::primitives::U256; - use helpers::should_send_proof_to_verify_on_chain; + use eth::should_send_proof_to_verify_on_chain; use std::time::Duration; #[test] From e873b90b338170a3c65a744296f238d098351034 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Mon, 22 Dec 2025 15:12:07 -0300 Subject: [PATCH 15/16] Move the should send tests to the eth module --- .../proof_aggregator/src/backend/eth.rs | 108 +++++++++++++++++ .../proof_aggregator/src/backend/mod.rs | 111 ------------------ 2 files changed, 108 insertions(+), 111 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/eth.rs b/aggregation_mode/proof_aggregator/src/backend/eth.rs index f23102a76..097863764 100644 --- a/aggregation_mode/proof_aggregator/src/backend/eth.rs +++ b/aggregation_mode/proof_aggregator/src/backend/eth.rs @@ -34,3 +34,111 @@ fn max_to_spend_in_wei(time_elapsed: Duration, monthly_eth_budget: f64) -> U256 budget_available_per_second_in_wei * elapsed_seconds } + +#[cfg(test)] +mod tests { + use super::should_send_proof_to_verify_on_chain; + use alloy::primitives::U256; + use std::time::Duration; + + #[test] + fn test_should_send_proof_to_verify_on_chain_updated_cases() { + // The should_send_proof_to_verify_on_chain function returns true when: + // gas_price * 600_000 <= (seconds_elapsed) * (monthly_eth_budget / (30 * 24 * 60 * 60)) + + const BUDGET_PER_MONTH_IN_ETH: f64 = 0.15; + const ONE_DAY_SECONDS: u64 = 24 * 60 * 60; + let gas_price = U256::from(1_000_000_000u64); // 1 Gwei + + // Case 1: Base case -> should return true + // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour + // Elapsed Time: 24 hours + // Gas Price: 1 Gwei + // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH + // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH + // Expected cost < Max to spend, so we can send the proof + assert!(should_send_proof_to_verify_on_chain( + Duration::from_secs(ONE_DAY_SECONDS), // 24 hours + BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget + gas_price, // 1 Gwei gas price + )); + + // Case 2: Slightly Increased Gas Price -> should return false + // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour + // Elapsed Time: 24 hours + // Gas Price: 8 Gwei + // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH + // Expected cost: 600,000 * 8 Gwei = 0.0048 ETH + // Expected cost < Max to spend, so we can send the proof + assert!(should_send_proof_to_verify_on_chain( + Duration::from_secs(ONE_DAY_SECONDS), // 24 hours + BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget + U256::from(8_000_000_000u64), // 8 Gwei gas price + )); + + // Case 3: Increased Gas Price -> should return false + // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour + // Elapsed Time: 24 hours + // Gas Price: 10 Gwei + // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH + // Expected cost: 600,000 * 10 Gwei = 0.006 ETH + // Expected cost > Max to spend, so we cannot send the proof + assert!(!should_send_proof_to_verify_on_chain( + Duration::from_secs(ONE_DAY_SECONDS), // 24 hours + BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget + U256::from(10_000_000_000u64), // 10 Gwei gas price + )); + + // Case 4: Slightly Reduced Time Elapsed -> should return true + // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour + // Elapsed Time: 2 hours + // Gas Price: 1 Gwei + // Max to spend: 0.000000058 ETH/hour * 3 hours = 0.000625 ETH + // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH + // Expected cost < Max to spend, so we can send the proof + assert!(should_send_proof_to_verify_on_chain( + Duration::from_secs(3 * 3600), // 3 hours + BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget + gas_price, // 1 Gwei gas price + )); + + // Case 5: Reduced Time Elapsed -> should return false + // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour + // Elapsed Time: 1.2 hours + // Gas Price: 1 Gwei + // Max to spend: 0.000000058 ETH/hour * 1.2 hours = 0.00025 ETH + // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH + // Expected cost > Max to spend, so we cannot send the proof + assert!(!should_send_proof_to_verify_on_chain( + Duration::from_secs_f64(1.2 * 3600.0), // 1.2 hours + BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget + gas_price, // 1 Gwei gas price + )); + + // Case 6: Slightly Reduced Monthly Budget -> should return true + // Monthly Budget: 0.1 ETH -> 0.0033 ETH per day -> 0.000000038 ETH per hour + // Elapsed Time: 24 hours + // Gas Price: 1 Gwei + // Max to spend: 0.000000038 ETH/hour * 24 hours = 0.0032832 ETH + // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH + // Expected cost < Max to spend, so we can send the proof + assert!(should_send_proof_to_verify_on_chain( + Duration::from_secs(ONE_DAY_SECONDS), // 24 hours + 0.1, // 0.1 ETH monthly budget + gas_price, // 1 Gwei gas price + )); + + // Case 7: Decreased Monthly Budget -> should return false + // Monthly Budget: 0.01 ETH -> 0.00033 ETH per day -> 0.0000000038 ETH per hour + // Elapsed Time: 24 hours + // Gas Price: 1 Gwei + // Max to spend: 0.0000000038 ETH/hour * 24 hours = 0.00032832 ETH + // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH + // Expected cost > Max to spend, so we cannot send the proof + assert!(!should_send_proof_to_verify_on_chain( + Duration::from_secs(ONE_DAY_SECONDS), // 24 hours + 0.01, // 0.01 ETH monthly budget + gas_price, // 1 Gwei gas price + )); + } +} diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index 1136fffa5..7cfee0d83 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -462,114 +462,3 @@ impl ProofAggregator { Ok(receipt) } } - -#[cfg(test)] -mod tests { - - use super::*; - - use alloy::primitives::U256; - use eth::should_send_proof_to_verify_on_chain; - use std::time::Duration; - - #[test] - fn test_should_send_proof_to_verify_on_chain_updated_cases() { - // The should_send_proof_to_verify_on_chain function returns true when: - // gas_price * 600_000 <= (seconds_elapsed) * (monthly_eth_budget / (30 * 24 * 60 * 60)) - - const BUDGET_PER_MONTH_IN_ETH: f64 = 0.15; - const ONE_DAY_SECONDS: u64 = 24 * 60 * 60; - let gas_price = U256::from(1_000_000_000u64); // 1 Gwei - - // Case 1: Base case -> should return true - // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour - // Elapsed Time: 24 hours - // Gas Price: 1 Gwei - // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH - // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH - // Expected cost < Max to spend, so we can send the proof - assert!(should_send_proof_to_verify_on_chain( - Duration::from_secs(ONE_DAY_SECONDS), // 24 hours - BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget - gas_price, // 1 Gwei gas price - )); - - // Case 2: Slightly Increased Gas Price -> should return false - // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour - // Elapsed Time: 24 hours - // Gas Price: 8 Gwei - // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH - // Expected cost: 600,000 * 8 Gwei = 0.0048 ETH - // Expected cost < Max to spend, so we can send the proof - assert!(should_send_proof_to_verify_on_chain( - Duration::from_secs(ONE_DAY_SECONDS), // 24 hours - BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget - U256::from(8_000_000_000u64), // 8 Gwei gas price - )); - - // Case 3: Increased Gas Price -> should return false - // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour - // Elapsed Time: 24 hours - // Gas Price: 10 Gwei - // Max to spend: 0.000000058 ETH/hour * 24 hours = 0.005 ETH - // Expected cost: 600,000 * 10 Gwei = 0.006 ETH - // Expected cost > Max to spend, so we cannot send the proof - assert!(!should_send_proof_to_verify_on_chain( - Duration::from_secs(ONE_DAY_SECONDS), // 24 hours - BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget - U256::from(10_000_000_000u64), // 10 Gwei gas price - )); - - // Case 4: Slightly Reduced Time Elapsed -> should return true - // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour - // Elapsed Time: 2 hours - // Gas Price: 1 Gwei - // Max to spend: 0.000000058 ETH/hour * 3 hours = 0.000625 ETH - // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH - // Expected cost < Max to spend, so we can send the proof - assert!(should_send_proof_to_verify_on_chain( - Duration::from_secs(3 * 3600), // 3 hours - BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget - gas_price, // 1 Gwei gas price - )); - - // Case 5: Reduced Time Elapsed -> should return false - // Monthly Budget: 0.15 ETH -> 0.005 ETH per day -> 0.000000058 ETH per hour - // Elapsed Time: 1.2 hours - // Gas Price: 1 Gwei - // Max to spend: 0.000000058 ETH/hour * 1.2 hours = 0.00025 ETH - // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH - // Expected cost > Max to spend, so we cannot send the proof - assert!(!should_send_proof_to_verify_on_chain( - Duration::from_secs_f64(1.2 * 3600.0), // 1.2 hours - BUDGET_PER_MONTH_IN_ETH, // 0.15 ETH monthly budget - gas_price, // 1 Gwei gas price - )); - - // Case 6: Slightly Reduced Monthly Budget -> should return true - // Monthly Budget: 0.1 ETH -> 0.0033 ETH per day -> 0.000000038 ETH per hour - // Elapsed Time: 24 hours - // Gas Price: 1 Gwei - // Max to spend: 0.000000038 ETH/hour * 24 hours = 0.0032832 ETH - // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH - // Expected cost < Max to spend, so we can send the proof - assert!(should_send_proof_to_verify_on_chain( - Duration::from_secs(ONE_DAY_SECONDS), // 24 hours - 0.1, // 0.1 ETH monthly budget - gas_price, // 1 Gwei gas price - )); - - // Case 7: Decreased Monthly Budget -> should return false - // Monthly Budget: 0.01 ETH -> 0.00033 ETH per day -> 0.0000000038 ETH per hour - // Elapsed Time: 24 hours - // Gas Price: 1 Gwei - // Max to spend: 0.0000000038 ETH/hour * 24 hours = 0.00032832 ETH - // Expected cost: 600,000 * 1 Gwei = 0.0006 ETH - // Expected cost > Max to spend, so we cannot send the proof - assert!(!should_send_proof_to_verify_on_chain( - Duration::from_secs(ONE_DAY_SECONDS), // 24 hours - 0.01, // 0.01 ETH monthly budget - gas_price, // 1 Gwei gas price - )); - } -} From 352cce0b63aa1c1fbc6a98fc9bd7a0232479b661 Mon Sep 17 00:00:00 2001 From: maximopalopoli Date: Tue, 23 Dec 2025 12:19:36 -0300 Subject: [PATCH 16/16] Remove the backon dependency --- aggregation_mode/Cargo.lock | 26 +------- aggregation_mode/proof_aggregator/Cargo.toml | 1 - .../proof_aggregator/src/backend/retry.rs | 64 +++++++++++++------ 3 files changed, 47 insertions(+), 44 deletions(-) diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 6e424297a..97fc2fe4e 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -2017,17 +2017,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "backon" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" -dependencies = [ - "fastrand", - "gloo-timers 0.3.0", - "tokio", -] - [[package]] name = "backtrace" version = "0.3.76" @@ -4448,7 +4437,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ - "gloo-timers 0.2.6", + "gloo-timers", "send_wrapper 0.4.0", ] @@ -4634,18 +4623,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "gloo-timers" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "group" version = "0.12.1" @@ -7156,7 +7133,6 @@ version = "0.1.0" dependencies = [ "aligned-sdk", "alloy", - "backon", "bincode", "c-kzg", "ciborium", diff --git a/aggregation_mode/proof_aggregator/Cargo.toml b/aggregation_mode/proof_aggregator/Cargo.toml index 2c367a144..294973f80 100644 --- a/aggregation_mode/proof_aggregator/Cargo.toml +++ b/aggregation_mode/proof_aggregator/Cargo.toml @@ -21,7 +21,6 @@ reqwest = { version = "0.12" } ciborium = "=0.2.2" lambdaworks-crypto = { git = "https://github.com/lambdaclass/lambdaworks.git", rev = "5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b", features = ["serde"]} rayon = "1.10.0" -backon = "1.2.0" sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "uuid", "bigdecimal" ] } # zkvms diff --git a/aggregation_mode/proof_aggregator/src/backend/retry.rs b/aggregation_mode/proof_aggregator/src/backend/retry.rs index 6e3a454a6..cc1a567b3 100644 --- a/aggregation_mode/proof_aggregator/src/backend/retry.rs +++ b/aggregation_mode/proof_aggregator/src/backend/retry.rs @@ -1,5 +1,3 @@ -use backon::ExponentialBuilder; -use backon::Retryable; use std::future::Future; use std::time::Duration; @@ -29,28 +27,58 @@ impl RetryError { impl std::error::Error for RetryError where E: std::fmt::Debug {} -/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function -/// Runs with `jitter: false`. pub async fn retry_function( - function: FutureFn, - min_delay: u64, + mut function: FutureFn, + min_delay_ms: u64, factor: f32, max_times: usize, - max_delay: u64, + max_delay_seconds: u64, ) -> Result> where Fut: Future>>, FutureFn: FnMut() -> Fut, { - let backoff = ExponentialBuilder::default() - .with_min_delay(Duration::from_millis(min_delay)) - .with_max_times(max_times) - .with_factor(factor) - .with_max_delay(Duration::from_secs(max_delay)); - - function - .retry(backoff) - .sleep(tokio::time::sleep) - .when(|e| matches!(e, RetryError::Transient(_))) - .await + let mut delay = Duration::from_millis(min_delay_ms); + + let factor = (factor as f64).max(1.0); + + let mut attempt: usize = 0; + + loop { + match function().await { + Ok(v) => return Ok(v), + Err(RetryError::Permanent(e)) => return Err(RetryError::Permanent(e)), + Err(RetryError::Transient(e)) => { + if attempt >= max_times { + return Err(RetryError::Transient(e)); + } + + tokio::time::sleep(delay).await; + + delay = next_backoff_delay(delay, max_delay_seconds, factor); + + attempt += 1; + } + } + } +} + +/// TODO: Replace with the one in aggregation_mode/db/src/orchestrator.rs, or use a common method. +fn next_backoff_delay(current_delay: Duration, max_delay_seconds: u64, factor: f64) -> Duration { + let max: Duration = Duration::from_secs(max_delay_seconds); + // Defensive: factor should be >= 1.0 for backoff, we clamp it to avoid shrinking/NaN. + + let scaled_secs = current_delay.as_secs_f64() * factor; + let scaled_secs = if scaled_secs.is_finite() { + scaled_secs + } else { + max.as_secs_f64() + }; + + let scaled = Duration::from_secs_f64(scaled_secs); + if scaled > max { + max + } else { + scaled + } }