diff --git a/keeper/src/l2/mod.rs b/keeper/src/l2/mod.rs index ce57427..6e81ce7 100644 --- a/keeper/src/l2/mod.rs +++ b/keeper/src/l2/mod.rs @@ -1,4 +1,4 @@ -use alloy::primitives::{Address, B256, Bytes, U256}; +use alloy::primitives::{Address, B256, Bytes}; use std::collections::HashMap; mod escalator; @@ -21,41 +21,12 @@ struct RoundState { raw_htx: Option, deadline: Option, outcome: Option, - round_info: Option, rewards_done: bool, - reward_sync_attempted: bool, jailing_done: bool, } -#[derive(Debug, Clone, Copy)] -struct RoundInfoView { - reward: Address, - valid_stake: U256, - invalid_stake: U256, -} - -#[derive(Debug, Clone)] -struct RewardPolicyCache { - last_checked_at: Option, - last_budget: Option, - last_remaining: Option, - last_sync_attempt_at: Option, -} - -impl RewardPolicyCache { - fn new() -> Self { - Self { - last_checked_at: None, - last_budget: None, - last_remaining: None, - last_sync_attempt_at: None, - } - } -} - #[derive(Default)] pub struct KeeperState { raw_htx_by_heartbeat: HashMap, rounds: HashMap, - reward_policies: HashMap, } diff --git a/keeper/src/l2/rewards.rs b/keeper/src/l2/rewards.rs index 23c34fe..5ebf77e 100644 --- a/keeper/src/l2/rewards.rs +++ b/keeper/src/l2/rewards.rs @@ -1,17 +1,17 @@ use crate::{ clients::{L2KeeperClient, RewardPolicyInstance}, - l2::{KeeperState, RewardPolicyCache, RoundInfoView, RoundKey}, + l2::{KeeperState, RoundKey}, metrics, }; use alloy::primitives::{Address, U256, map::HashMap, utils::format_units}; -use anyhow::{Context, bail}; +use anyhow::{Context, anyhow, bail}; use blacklight_contract_clients::{ ProtocolConfig::ProtocolConfigInstance, common::{errors::decode_any_error, overestimate_gas}, }; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; const MIN_NIL_SYNC_THRESHOLD: u64 = 100; const RESPONDED_BIT: u64 = 1 << 2; @@ -24,10 +24,19 @@ struct TokenContext { address: Address, } +#[derive(Clone, Copy)] +struct RewardsContext { + token: TokenContext, + checked_at: u64, + synced_at: u64, + spendable: U256, + remaining: U256, +} + pub(crate) struct RewardsDistributor { client: Arc, state: Arc>, - token_context: HashMap, + rewards_context: HashMap, } impl RewardsDistributor { @@ -35,7 +44,7 @@ impl RewardsDistributor { Self { client, state, - token_context: Default::default(), + rewards_context: Default::default(), } } @@ -54,7 +63,7 @@ impl RewardsDistributor { .call() .await .context("Failed to get reward policy contract address")?; - let token = self.fetch_token_context(reward_policy_address).await?; + let token = self.fetch_token_context(reward_policy_address).await?.token; let reward_policy = self.client.reward_policy(reward_policy_address); let erc20 = self.client.erc20(token.address); @@ -78,6 +87,7 @@ impl RewardsDistributor { Ok(()) } + #[instrument(skip_all, fields(key = ?key.heartbeat_key, round = key.round))] pub(crate) async fn distribute_rewards( &mut self, block_timestamp: u64, @@ -85,33 +95,14 @@ impl RewardsDistributor { outcome: u8, members: Vec
, ) -> anyhow::Result<()> { - let cached_info = { - let state = self.state.lock().await; - state.rounds.get(&key).and_then(|round| round.round_info) - }; - let round_info = match cached_info { - Some(info) => info, - None => { - let info = self - .client - .heartbeat_manager() - .rounds(key.heartbeat_key, key.round) - .call() - .await?; - let view = RoundInfoView { - reward: info.reward, - valid_stake: info.validStake, - invalid_stake: info.invalidStake, - }; - let mut state = self.state.lock().await; - if let Some(round_state) = state.rounds.get_mut(&key) { - round_state.round_info = Some(view); - } - view - } - }; + let info = self + .client + .heartbeat_manager() + .rounds(key.heartbeat_key, key.round) + .call() + .await?; if !self - .ensure_reward_budget(block_timestamp, round_info.reward, key) + .ensure_reward_budget(block_timestamp, info.reward) .await? { return Ok(()); @@ -122,15 +113,13 @@ impl RewardsDistributor { .build_voter_list(key, &members, expected_verdict) .await?; let expected_stake = if outcome == 1 { - round_info.valid_stake + info.validStake } else { - round_info.invalid_stake + info.invalidStake }; if sum_weights != expected_stake { warn!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, sum_weights = ?sum_weights, expected_stake = ?expected_stake, "Reward weights mismatch, skipping" @@ -138,12 +127,7 @@ impl RewardsDistributor { return Ok(()); } - info!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - voters = voters.len(), - "Distributing rewards" - ); + info!(voters = voters.len(), "Distributing rewards"); let call = self.client @@ -154,8 +138,6 @@ impl RewardsDistributor { Ok(pending) => { let receipt = pending.get_receipt().await?; info!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, tx_hash = ?receipt.transaction_hash, "Rewards distributed" ); @@ -172,170 +154,89 @@ impl RewardsDistributor { } } + #[instrument(skip_all, fields(reward = ?reward_address))] async fn ensure_reward_budget( &mut self, block_timestamp: u64, reward_address: Address, - key: RoundKey, ) -> anyhow::Result { if reward_address == Address::ZERO { - warn!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - "Reward policy address is zero, skipping" - ); + warn!("Reward policy address is zero, skipping"); return Ok(false); } - let mut cache = { - let state = self.state.lock().await; - state - .reward_policies - .get(&reward_address) - .cloned() - .unwrap_or_else(RewardPolicyCache::new) - }; - let reward_policy = self.client.reward_policy(reward_address); - let mut budget = None; - if cache.last_checked_at == Some(block_timestamp) { - budget = cache.last_budget; - } - if budget.is_none() { - let fetched = reward_policy.spendableBudget().call().await?; - budget = Some(fetched); - cache.last_checked_at = Some(block_timestamp); - cache.last_budget = Some(fetched); - cache.last_remaining = None; + let ctx = self.fetch_token_context(reward_address).await?; + if ctx.checked_at != block_timestamp { + // Fetch the current spendable/remaining budgets + ctx.spendable = reward_policy.spendableBudget().call().await?; + ctx.remaining = reward_policy.streamRemaining().call().await?; + ctx.checked_at = block_timestamp; + metrics::get().l2.rewards.set_spendable(ctx.spendable); + metrics::get().l2.rewards.set_remaining(ctx.remaining); + + let spendable = format_units(ctx.spendable, ctx.token.decimals)?; + let remaining = format_units(ctx.remaining, ctx.token.decimals)?; + info!( + spendable = spendable, + remaining = remaining, + "Fetched contract context" + ); } - let budget = budget.unwrap_or(U256::ZERO); - metrics::get().l2.rewards.set_budget(budget); - if budget > U256::ZERO { - self.store_reward_cache(reward_address, cache).await; + if ctx.spendable > U256::ZERO { return Ok(true); } - let remaining = if let Some(value) = cache.last_remaining { - value - } else { - let value = reward_policy.streamRemaining().call().await?; - cache.last_remaining = Some(value); - value - }; - let token_ctx = self.fetch_token_context(reward_address).await?; - let should_unlock = if remaining > U256::ZERO { - self.can_unlock_budget( - &reward_policy, - remaining, - block_timestamp, - token_ctx.decimals, - ) - .await? - } else { - false - }; + let should_unlock = Self::can_unlock_budget( + &reward_policy, + ctx.remaining, + block_timestamp, + ctx.token.decimals, + ) + .await?; if !should_unlock { - info!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - reward = ?reward_address, - "Reward budget still unlocking, skipping" - ); - self.store_reward_cache(reward_address, cache).await; + info!("Reward budget still unlocking, skipping"); return Ok(false); } - let already_attempted = { - let mut state = self.state.lock().await; - let entry = state.rounds.entry(key).or_default(); - if entry.reward_sync_attempted { - true - } else { - entry.reward_sync_attempted = true; - false - } - }; - if already_attempted { - debug!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - reward = ?reward_address, - "Reward sync already attempted for round, skipping" - ); - self.store_reward_cache(reward_address, cache).await; + if ctx.synced_at == block_timestamp { + debug!("Reward sync already attempted for reward policy in this tick"); return Ok(false); } + ctx.synced_at = block_timestamp; - if cache.last_sync_attempt_at == Some(block_timestamp) { - debug!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - reward = ?reward_address, - "Reward sync already attempted for reward policy in this tick" - ); - self.store_reward_cache(reward_address, cache).await; - return Ok(false); - } - cache.last_sync_attempt_at = Some(block_timestamp); - - info!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - reward = ?reward_address, - "Reward budget unlocking, syncing policy", - ); - + info!("Reward budget unlocking, syncing policy",); match reward_policy.sync().send().await { Ok(pending) => { let receipt = pending.get_receipt().await?; info!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - reward = ?reward_address, tx_hash = ?receipt.transaction_hash, "Reward policy synced" ); } Err(e) => { - warn!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - reward = ?reward_address, - error = %decode_any_error(&e), - "Reward policy sync failed" - ); - self.store_reward_cache(reward_address, cache).await; + warn!("Reward policy sync failed: {}", decode_any_error(&e)); return Ok(false); } } - let budget_after = reward_policy.spendableBudget().call().await?; - if budget_after == U256::ZERO { - let remaining_after = reward_policy.streamRemaining().call().await?; - let skip_msg = if remaining_after > U256::ZERO { - "Reward budget still unlocking after sync, skipping" + // Update our state after syncing + ctx.spendable = reward_policy.spendableBudget().call().await?; + ctx.remaining = reward_policy.streamRemaining().call().await?; + if ctx.spendable == U256::ZERO { + if ctx.remaining > U256::ZERO { + info!("Reward budget still unlocking after sync, skipping"); } else { - "Reward budget still empty after sync, skipping" - }; - info!( - heartbeat_key = ?key.heartbeat_key, - round = key.round, - reward = ?reward_address, - "{}", skip_msg - ); - cache.last_budget = Some(budget_after); - self.store_reward_cache(reward_address, cache).await; - return Ok(false); + info!("Reward budget still empty after sync, skipping"); + } + Ok(false) + } else { + Ok(true) } - - cache.last_budget = Some(budget_after); - self.store_reward_cache(reward_address, cache).await; - Ok(true) } async fn can_unlock_budget( - &self, reward_policy: &RewardPolicyInstance, remaining: U256, block_timestamp: u64, @@ -348,6 +249,7 @@ impl RewardsDistributor { let stream_rate = reward_policy.streamRatePerSecondWad().call().await?; let last_update = reward_policy.lastUpdate().call().await?; let stream_end = reward_policy.streamEnd().call().await?; + metrics::get().l2.rewards.set_end(stream_end); if block_timestamp >= stream_end { return Ok(true); @@ -370,11 +272,6 @@ impl RewardsDistributor { Ok(unlocked >= threshold) } - async fn store_reward_cache(&self, reward_address: Address, cache: RewardPolicyCache) { - let mut state = self.state.lock().await; - state.reward_policies.insert(reward_address, cache); - } - async fn build_voter_list( &self, key: RoundKey, @@ -403,22 +300,33 @@ impl RewardsDistributor { Ok((voters, total_weight)) } - async fn fetch_token_context(&mut self, address: Address) -> anyhow::Result { - if let Some(context) = self.token_context.get(&address) { - return Ok(*context); + async fn fetch_token_context( + &mut self, + address: Address, + ) -> anyhow::Result<&mut RewardsContext> { + if !self.rewards_context.contains_key(&address) { + info!("Fetching token context for rewards policy address {address}"); + let reward_policy = RewardPolicyInstance::new(address, self.client.provider()); + let token_address = reward_policy.rewardToken().call().await?; + let erc20 = self.client.erc20(token_address); + let decimals = erc20.decimals().call().await?; + + let token = TokenContext { + decimals, + address: token_address, + }; + let context = RewardsContext { + token, + checked_at: 0, + synced_at: 0, + spendable: U256::ZERO, + remaining: U256::ZERO, + }; + self.rewards_context.insert(address, context); } - info!("Fetching token context for rewards policy address {address}"); - let reward_policy = RewardPolicyInstance::new(address, self.client.provider()); - let token_address = reward_policy.rewardToken().call().await?; - let erc20 = self.client.erc20(token_address); - let decimals = erc20.decimals().call().await?; - - let context = TokenContext { - decimals, - address: token_address, - }; - self.token_context.insert(address, context); - Ok(context) + self.rewards_context + .get_mut(&address) + .ok_or_else(|| anyhow!("insertion gone")) } } diff --git a/keeper/src/metrics.rs b/keeper/src/metrics.rs index 8b09ae9..97106cd 100644 --- a/keeper/src/metrics.rs +++ b/keeper/src/metrics.rs @@ -156,7 +156,9 @@ impl L2EventMetrics { pub(crate) struct L2RewardsMetrics { distribution: Counter, - budget: Gauge, + spendable: Gauge, + remaining: Gauge, + end: Gauge, } impl L2RewardsMetrics { @@ -165,13 +167,23 @@ impl L2RewardsMetrics { .u64_counter("blacklight.keeper.l2.rewards.distributions") .with_description("Number of times rewards were distributed") .build(); - let budget = meter - .f64_gauge("blacklight.keeper.l2.rewards.budget") - .with_description("The current spendable budget for rewards") + let spendable = meter + .f64_gauge("blacklight.keeper.l2.rewards.spendadble") + .with_description("The spendable budget for rewards") + .build(); + let remaining = meter + .f64_gauge("blacklight.keeper.l2.rewards.remaining") + .with_description("The remaining budget to be unlocked for rewards") + .build(); + let end = meter + .u64_gauge("blacklight.keeper.l2.rewards.end") + .with_description("The UNIX epoch timestamp when the current budget runs out") .build(); Self { distribution, - budget, + spendable, + remaining, + end, } } @@ -179,8 +191,16 @@ impl L2RewardsMetrics { self.distribution.add(1, &[]); } - pub(crate) fn set_budget(&self, value: U256) { - self.budget.record(value.into(), &[]); + pub(crate) fn set_spendable(&self, value: U256) { + self.spendable.record(value.into(), &[]); + } + + pub(crate) fn set_remaining(&self, value: U256) { + self.remaining.record(value.into(), &[]); + } + + pub(crate) fn set_end(&self, value: u64) { + self.end.record(value, &[]); } }