diff --git a/crates/blacklight-contract-clients/src/common/mod.rs b/crates/blacklight-contract-clients/src/common/mod.rs index a8001f0..8d581c0 100644 --- a/crates/blacklight-contract-clients/src/common/mod.rs +++ b/crates/blacklight-contract-clients/src/common/mod.rs @@ -10,7 +10,7 @@ pub mod event_helper; pub mod tx_submitter; pub async fn overestimate_gas( - call: &CallBuilder<&P, D>, + call: &CallBuilder, ) -> anyhow::Result { // Estimate gas and add a 50% buffer let estimated_gas = call.estimate_gas().await.map_err(|e| { diff --git a/crates/blacklight-contract-clients/src/common/tx_submitter.rs b/crates/blacklight-contract-clients/src/common/tx_submitter.rs index 30f6b63..11a7ecf 100644 --- a/crates/blacklight-contract-clients/src/common/tx_submitter.rs +++ b/crates/blacklight-contract-clients/src/common/tx_submitter.rs @@ -1,3 +1,4 @@ +use crate::common::overestimate_gas; use alloy::{ consensus::Transaction, contract::CallBuilder, primitives::B256, providers::Provider, rpc::types::TransactionReceipt, sol_types::SolInterface, @@ -10,22 +11,22 @@ use tokio::sync::Mutex; use tracing::{info, warn}; #[derive(Clone)] -pub(crate) struct TransactionSubmitter { +pub struct TransactionSubmitter { tx_lock: Arc>, - gas_limit: Option, + gas_buffer: bool, _decoder: PhantomData, } impl TransactionSubmitter { - pub(crate) fn new(tx_lock: Arc>) -> Self { + pub fn new(tx_lock: Arc>) -> Self { Self { tx_lock, - gas_limit: None, + gas_buffer: false, _decoder: PhantomData, } } - pub(crate) async fn invoke(&self, method: &str, call: CallBuilder) -> Result + pub async fn invoke(&self, method: &str, call: CallBuilder) -> Result where P: Provider + Clone, D: alloy::contract::CallDecoder + Clone, @@ -36,9 +37,12 @@ impl TransactionSubmitter { return Err(anyhow!("{method} reverted: {e}")); } - let call = match self.gas_limit { - Some(gas) => call.gas(gas), - None => call, + let (call, gas_limit) = match self.gas_buffer { + true => { + let gas = overestimate_gas(&call).await?; + (call.gas(gas), Some(gas)) + } + false => (call, None), }; let provider = call.provider.clone(); @@ -75,11 +79,11 @@ impl TransactionSubmitter { // Validate success if !receipt.status() { - if let Some(limit) = self.gas_limit { + if let Some(gas_limit) = gas_limit { let used = receipt.gas_used; - if used >= limit { + if used >= gas_limit { return Err(anyhow!( - "{method} ran out of gas (used {used} of {limit} limit). Tx: {tx_hash:?}" + "{method} ran out of gas (used {used} of {gas_limit} limit). Tx: {tx_hash:?}" )); } } @@ -90,9 +94,9 @@ impl TransactionSubmitter { Ok(tx_hash) } - pub(crate) fn with_gas_limit(&self, limit: u64) -> Self { + pub fn with_gas_buffer(&self) -> Self { let mut this = self.clone(); - this.gas_limit = Some(limit); + this.gas_buffer = true; this } diff --git a/crates/blacklight-contract-clients/src/heartbeat_manager.rs b/crates/blacklight-contract-clients/src/heartbeat_manager.rs index 90a07cf..7671b21 100644 --- a/crates/blacklight-contract-clients/src/heartbeat_manager.rs +++ b/crates/blacklight-contract-clients/src/heartbeat_manager.rs @@ -1,8 +1,8 @@ use crate::common::event_helper::{BlockRange, listen_events, listen_events_filtered}; use crate::htx::Htx; use crate::{ - common::{overestimate_gas, tx_submitter::TransactionSubmitter}, - heartbeat_manager::HearbeatManager::HearbeatManagerInstance, + common::tx_submitter::TransactionSubmitter, + heartbeat_manager::HeartbeatManager::HeartbeatManagerInstance, }; use alloy::{ primitives::{Address, B256, U256, keccak256}, @@ -23,7 +23,7 @@ sol! { #[sol(rpc)] #[derive(Debug)] - contract HearbeatManager { + contract HeartbeatManager { error ZeroAddress(); error NotPending(); error RoundClosed(); @@ -90,21 +90,22 @@ sol! { } } -pub type RoundStartedEvent = HearbeatManager::RoundStarted; -pub type OperatorVotedEvent = HearbeatManager::OperatorVoted; -pub type HeartbeatEnqueuedEvent = HearbeatManager::HeartbeatEnqueued; -pub type RoundFinalizedEvent = HearbeatManager::RoundFinalized; -pub type RewardsDistributedEvent = HearbeatManager::RewardsDistributed; -pub type RewardDistributionAbandonedEvent = HearbeatManager::RewardDistributionAbandoned; -pub type SlashingCallbackFailedEvent = HearbeatManager::SlashingCallbackFailed; +pub type RoundStartedEvent = HeartbeatManager::RoundStarted; +pub type OperatorVotedEvent = HeartbeatManager::OperatorVoted; +pub type HeartbeatEnqueuedEvent = HeartbeatManager::HeartbeatEnqueued; +pub type RoundFinalizedEvent = HeartbeatManager::RoundFinalized; +pub type RewardsDistributedEvent = HeartbeatManager::RewardsDistributed; +pub type RewardDistributionAbandonedEvent = HeartbeatManager::RewardDistributionAbandoned; +pub type SlashingCallbackFailedEvent = HeartbeatManager::SlashingCallbackFailed; +pub type HeartbeatManagerErrors = HeartbeatManager::HeartbeatManagerErrors; /// WebSocket-based client for real-time event streaming and contract interaction with /// HeartbeatManager #[derive(Clone)] pub struct HeartbeatManagerClient { provider: P, - contract: HearbeatManagerInstance

, - submitter: TransactionSubmitter, + contract: HeartbeatManagerInstance

, + submitter: TransactionSubmitter, block_lookback: u64, } @@ -112,7 +113,7 @@ impl HeartbeatManagerClient

{ /// Create a new WebSocket client from ContractConfig pub fn new(provider: P, config: super::ContractConfig, tx_lock: Arc>) -> Self { let contract = - HearbeatManagerInstance::new(config.manager_contract_address, provider.clone()); + HeartbeatManagerInstance::new(config.manager_contract_address, provider.clone()); let submitter = TransactionSubmitter::new(tx_lock); Self { provider, @@ -166,9 +167,8 @@ impl HeartbeatManagerClient

{ let snapshot_id = snapshot_id.saturating_sub(1); let raw_htx = alloy::primitives::Bytes::try_from(htx)?; let call = self.contract.submitHeartbeat(raw_htx, snapshot_id); - let gas_with_buffer = overestimate_gas(&call).await?; self.submitter - .with_gas_limit(gas_with_buffer) + .with_gas_buffer() .invoke("submitHeartbeat", call) .await } @@ -190,9 +190,8 @@ impl HeartbeatManagerClient

{ let call = self .contract .submitVerdict(event.heartbeatKey, verdict, proofs); - let gas_with_buffer = overestimate_gas(&call).await?; self.submitter - .with_gas_limit(gas_with_buffer) + .with_gas_buffer() .invoke("submitVerdict", call) .await } diff --git a/crates/blacklight-contract-clients/src/lib.rs b/crates/blacklight-contract-clients/src/lib.rs index 1b615de..8d046b0 100644 --- a/crates/blacklight-contract-clients/src/lib.rs +++ b/crates/blacklight-contract-clients/src/lib.rs @@ -23,7 +23,7 @@ pub use staking_operators::StakingOperatorsClient; // ============================================================================ // Heartbeat manager events -pub use heartbeat_manager::HearbeatManager; +pub use heartbeat_manager::HeartbeatManager; // ProtocolConfig events pub use protocol_config::ProtocolConfig; diff --git a/keeper/src/clients.rs b/keeper/src/clients.rs index 354ada9..2c4db89 100644 --- a/keeper/src/clients.rs +++ b/keeper/src/clients.rs @@ -5,9 +5,9 @@ use alloy::{ providers::{DynProvider, Provider, ProviderBuilder, WsConnect}, signers::local::PrivateKeySigner, }; -use blacklight_contract_clients::{HearbeatManager, StakingOperators}; +use blacklight_contract_clients::{HeartbeatManager, StakingOperators}; -pub type HeartbeatManagerInstance = HearbeatManager::HearbeatManagerInstance; +pub type HeartbeatManagerInstance = HeartbeatManager::HeartbeatManagerInstance; pub type StakingOperatorsInstance = StakingOperators::StakingOperatorsInstance; pub type JailingPolicyInstance = JailingPolicy::JailingPolicyInstance; pub type EmissionsControllerInstance = diff --git a/keeper/src/l2/escalator.rs b/keeper/src/l2/escalator.rs index 361ea7d..fda3044 100644 --- a/keeper/src/l2/escalator.rs +++ b/keeper/src/l2/escalator.rs @@ -1,6 +1,9 @@ use crate::{clients::L2KeeperClient, l2::KeeperState, metrics}; use alloy::primitives::{B256, Bytes}; -use blacklight_contract_clients::common::errors::decode_any_error; +use blacklight_contract_clients::{ + common::{errors::decode_any_error, tx_submitter::TransactionSubmitter}, + heartbeat_manager::HeartbeatManagerErrors, +}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; use tracing::{info, warn}; @@ -8,11 +11,16 @@ use tracing::{info, warn}; pub(crate) struct RoundEscalator { client: Arc, state: Arc>, + submitter: TransactionSubmitter, } impl RoundEscalator { pub(crate) fn new(client: Arc, state: Arc>) -> Self { - Self { client, state } + Self { + client, + state, + submitter: TransactionSubmitter::new(Default::default()), + } } pub(crate) async fn process_escalations(&self, block_timestamp: u64) -> anyhow::Result<()> { @@ -78,12 +86,11 @@ impl RoundEscalator { .heartbeat_manager() .escalateOrExpire(heartbeat_key, raw_htx.clone()); - match call.send().await { - Ok(pending) => { - let receipt = pending.get_receipt().await?; + match self.submitter.invoke("escalateOrExpire", call).await { + Ok(tx_hash) => { info!( heartbeat_key = ?heartbeat_key, - tx_hash = ?receipt.transaction_hash, + tx_hash = ?tx_hash, "Escalate/expire confirmed" ); metrics::get().l2.escalations.inc_escalations(); @@ -115,12 +122,11 @@ impl RoundEscalator { .heartbeat_manager() .escalateOrExpire(heartbeat_key, raw_htx.clone()); - match call.send().await { - Ok(pending) => { - let receipt = pending.get_receipt().await?; + match self.submitter.invoke("escalateOrExpire", call).await { + Ok(tx_hash) => { info!( heartbeat_key = ?heartbeat_key, - tx_hash = ?receipt.transaction_hash, + tx_hash = ?tx_hash, "Escalate/expire confirmed" ); metrics::get().l2.escalations.inc_escalations(); diff --git a/keeper/src/l2/events.rs b/keeper/src/l2/events.rs index 29264b4..75e74ff 100644 --- a/keeper/src/l2/events.rs +++ b/keeper/src/l2/events.rs @@ -6,7 +6,7 @@ use crate::{ use alloy::{primitives::B256, rpc::types::Log, sol_types::SolEvent}; use anyhow::Context; use blacklight_contract_clients::{ - HearbeatManager::SlashingCallbackFailed, + HeartbeatManager::SlashingCallbackFailed, heartbeat_manager::{ HeartbeatEnqueuedEvent, RewardDistributionAbandonedEvent, RewardsDistributedEvent, RoundFinalizedEvent, RoundStartedEvent, SlashingCallbackFailedEvent, diff --git a/keeper/src/l2/rewards.rs b/keeper/src/l2/rewards.rs index 5ebf77e..f4b8a25 100644 --- a/keeper/src/l2/rewards.rs +++ b/keeper/src/l2/rewards.rs @@ -7,7 +7,8 @@ use alloy::primitives::{Address, U256, map::HashMap, utils::format_units}; use anyhow::{Context, anyhow, bail}; use blacklight_contract_clients::{ ProtocolConfig::ProtocolConfigInstance, - common::{errors::decode_any_error, overestimate_gas}, + common::{errors::decode_any_error, tx_submitter::TransactionSubmitter}, + heartbeat_manager::HeartbeatManagerErrors, }; use std::sync::Arc; use tokio::sync::Mutex; @@ -27,6 +28,7 @@ struct TokenContext { #[derive(Clone, Copy)] struct RewardsContext { token: TokenContext, + // These are block timestamps checked_at: u64, synced_at: u64, spendable: U256, @@ -37,6 +39,7 @@ pub(crate) struct RewardsDistributor { client: Arc, state: Arc>, rewards_context: HashMap, + submitter: TransactionSubmitter, } impl RewardsDistributor { @@ -45,6 +48,7 @@ impl RewardsDistributor { client, state, rewards_context: Default::default(), + submitter: TransactionSubmitter::new(Default::default()).with_gas_buffer(), } } @@ -75,14 +79,8 @@ impl RewardsDistributor { let balance = format_units(balance, token.decimals)?; let sync_limit = format_units(sync_limit, token.decimals)?; info!("Need to sync balance because balance ({balance}) > sync limit ({sync_limit})"); - let receipt = reward_policy - .sync() - .send() - .await - .context("Failed to sync")? - .get_receipt() - .await?; - info!(tx_hash = ?receipt.transaction_hash, "Reward policy synced"); + let tx_hash = self.submitter.invoke("sync", reward_policy.sync()).await?; + info!(tx_hash = ?tx_hash, "Reward policy synced"); } Ok(()) } @@ -133,12 +131,10 @@ impl RewardsDistributor { self.client .heartbeat_manager() .distributeRewards(key.heartbeat_key, key.round, voters); - let gas_with_buffer = overestimate_gas(&call).await?; - match call.gas(gas_with_buffer).send().await { - Ok(pending) => { - let receipt = pending.get_receipt().await?; + match self.submitter.invoke("distributeRewards", call).await { + Ok(tx_hash) => { info!( - tx_hash = ?receipt.transaction_hash, + tx_hash = ?tx_hash, "Rewards distributed" ); let mut state = self.state.lock().await; @@ -164,7 +160,7 @@ impl RewardsDistributor { warn!("Reward policy address is zero, skipping"); return Ok(false); } - + let submitter = self.submitter.clone(); let reward_policy = self.client.reward_policy(reward_address); let ctx = self.fetch_token_context(reward_address).await?; if ctx.checked_at != block_timestamp { @@ -207,11 +203,10 @@ impl RewardsDistributor { ctx.synced_at = block_timestamp; info!("Reward budget unlocking, syncing policy",); - match reward_policy.sync().send().await { - Ok(pending) => { - let receipt = pending.get_receipt().await?; + match submitter.invoke("sync", reward_policy.sync()).await { + Ok(tx_hash) => { info!( - tx_hash = ?receipt.transaction_hash, + tx_hash = ?tx_hash, "Reward policy synced" ); } diff --git a/keeper/src/l2/supervisor.rs b/keeper/src/l2/supervisor.rs index e1cbc73..1b6930a 100644 --- a/keeper/src/l2/supervisor.rs +++ b/keeper/src/l2/supervisor.rs @@ -138,7 +138,7 @@ impl L2Supervisor { .distribute_rewards(block_timestamp, key, outcome, members) .await { - error!("Failed to process jailing: {e}"); + error!("Failed to process reward distribution: {e}"); } }