diff --git a/integration-tests/tests/chain_cache.rs b/integration-tests/tests/chain_cache.rs index 88899cbbe..ad895491f 100644 --- a/integration-tests/tests/chain_cache.rs +++ b/integration-tests/tests/chain_cache.rs @@ -54,10 +54,10 @@ mod chain_query_interface { chain_index::{ source::ValidatorConnector, types::{BestChainLocation, TransactionHash}, - NodeBackedChainIndex, NodeBackedChainIndexSubscriber, + Index, Subscriber, }, test_dependencies::{ - chain_index::{self, ChainIndex}, + chain_index::{self, Queryable}, BlockCacheConfig, }, Height, StateService, StateServiceConfig, ZcashService, @@ -79,8 +79,8 @@ mod chain_query_interface { TestManager, JsonRpSeeConnector, Option, - NodeBackedChainIndex, - NodeBackedChainIndexSubscriber, + Index, + Subscriber, ) { let (test_manager, json_service) = create_test_manager_and_connector( validator, @@ -150,7 +150,7 @@ mod chain_query_interface { test_manager.local_net.get_activation_heights().into(), ), }; - let chain_index = NodeBackedChainIndex::new( + let chain_index = Index::new( ValidatorConnector::State(chain_index::source::State { read_state_service: state_service.read_state_service().clone(), mempool_fetcher: json_service.clone(), @@ -187,12 +187,10 @@ mod chain_query_interface { test_manager.local_net.get_activation_heights().into(), ), }; - let chain_index = NodeBackedChainIndex::new( - ValidatorConnector::Fetch(json_service.clone()), - config, - ) - .await - .unwrap(); + let chain_index = + Index::new(ValidatorConnector::Fetch(json_service.clone()), config) + .await + .unwrap(); let index_reader = chain_index.subscriber().await; tokio::time::sleep(Duration::from_secs(3)).await; diff --git a/zaino-state/src/chain_index.rs b/zaino-state/src/chain_index.rs index db7358da8..217d1e34e 100644 --- a/zaino-state/src/chain_index.rs +++ b/zaino-state/src/chain_index.rs @@ -5,22 +5,21 @@ //! - NonFinalisedState: Holds block data for the top 100 blocks of all chains. //! - FinalisedState: Holds block data for the remainder of the best chain. //! -//! - Chain: Holds chain / block structs used internally by the ChainIndex. +//! - Chain: Holds chain / block structs used internally by the Queryable //! - Holds fields required to: //! - a. Serve CompactBlock data dirctly. //! - b. Build trasparent tx indexes efficiently //! - NOTE: Full transaction and block data is served from the backend finalizer. -use crate::chain_index::non_finalised_state::BestTip; +use crate::chain_index::snapshot::Snapshottable as _; use crate::chain_index::types::{BestChainLocation, NonBestChainLocation}; use crate::error::{ChainIndexError, ChainIndexErrorKind, FinalisedStateError}; -use crate::IndexedBlock; -use crate::{AtomicStatus, StatusType, SyncError}; +use crate::{AtomicStatus, NonfinalizedBlockCacheSnapshot, StatusType}; +use crate::{IndexedBlock, SyncError}; use std::collections::HashSet; use std::{sync::Arc, time::Duration}; use futures::{FutureExt, Stream}; -use non_finalised_state::NonfinalizedBlockCacheSnapshot; use source::{BlockchainSource, ValidatorConnector}; use tokio_stream::StreamExt; use tracing::info; @@ -36,6 +35,8 @@ pub mod finalised_state; pub mod mempool; /// State less than 100 blocks old, stored separately as it may be reorged pub mod non_finalised_state; +/// Snapshots of non_finalised_state +pub mod snapshot; /// BlockchainSource pub mod source; /// Common types used by the rest of this module @@ -44,15 +45,148 @@ pub mod types; #[cfg(test)] mod tests; +/// The combined index. Contains a view of the mempool, and the full +/// chain state, both finalized and non-finalized, to allow queries over +/// the entire chain at once. +/// +/// This is the primary implementation backing [`Queryable`] and replaces the functionality +/// previously provided by `FetchService` and `StateService`. It can be backed by either: +/// - A zebra `ReadStateService` for direct database access (preferred for performance) +/// - A JSON-RPC connection to any validator node (zcashd, zebrad, or another zainod) +/// +/// To use the [`Queryable`] trait methods, call [`subscriber()`](Index::subscriber) +/// to get a [`Subscriber`] which implements the trait. +/// +/// # Construction +/// +/// Use [`Index::new()`] with: +/// - A [`ValidatorConnector`] source (State variant preferred, Fetch as fallback) +/// - A [`crate::config::BlockCacheConfig`] containing cache and database settings +/// +/// # Example with StateService (Preferred) +/// +/// ```no_run +/// # async fn example() -> Result<(), Box> { +/// use zaino_state::{Index, ValidatorConnector, BlockCacheConfig}; +/// use zaino_state::chain_index::source::State; +/// use zaino_fetch::jsonrpsee::connector::JsonRpSeeConnector; +/// use zebra_state::{ReadStateService, Config as ZebraConfig}; +/// use std::path::PathBuf; +/// +/// // Create ReadStateService for direct database access +/// let zebra_config = ZebraConfig::default(); +/// let read_state_service = ReadStateService::new(&zebra_config).await?; +/// +/// // Temporary: Create JSON-RPC connector for mempool access +/// let mempool_connector = JsonRpSeeConnector::new_from_config_parts( +/// false, +/// "127.0.0.1:8232".parse()?, +/// "user".to_string(), +/// "password".to_string(), +/// None, +/// ).await?; +/// +/// let source = ValidatorConnector::State(State { +/// read_state_service, +/// mempool_fetcher: mempool_connector, +/// }); +/// +/// // Configure the cache (extract these from your previous StateServiceConfig) +/// let config = BlockCacheConfig { +/// map_capacity: Some(1000), +/// map_shard_amount: Some(16), +/// db_version: 1, +/// db_path: PathBuf::from("/path/to/cache"), +/// db_size: Some(10), // GB +/// network: zebra_chain::parameters::Network::Mainnet, +/// no_sync: false, +/// no_db: false, +/// }; +/// +/// let chain_index = Index::new(source, config).await?; +/// let subscriber = chain_index.subscriber().await; +/// +/// // Use the subscriber to access Queryable trait methods +/// let snapshot = subscriber.snapshot_nonfinalized_state(); +/// # Ok(()) +/// # } +/// ``` +/// +/// # Example with JSON-RPC Only (Fallback) +/// +/// ```no_run +/// # async fn example() -> Result<(), Box> { +/// use zaino_state::{Index, ValidatorConnector, BlockCacheConfig}; +/// use zaino_fetch::jsonrpsee::connector::JsonRpSeeConnector; +/// use std::path::PathBuf; +/// +/// // For JSON-RPC backend (replaces FetchService::spawn) +/// let connector = JsonRpSeeConnector::new_from_config_parts( +/// false, +/// "127.0.0.1:8232".parse()?, +/// "user".to_string(), +/// "password".to_string(), +/// None, +/// ).await?; +/// let source = ValidatorConnector::Fetch(connector); +/// +/// // Configure the cache (extract these from your previous FetchServiceConfig) +/// let config = BlockCacheConfig { +/// map_capacity: Some(1000), +/// map_shard_amount: Some(16), +/// db_version: 1, +/// db_path: PathBuf::from("/path/to/cache"), +/// db_size: Some(10), // GB +/// network: zebra_chain::parameters::Network::Mainnet, +/// no_sync: false, +/// no_db: false, +/// }; +/// +/// let chain_index = Index::new(source, config).await?; +/// let subscriber = chain_index.subscriber().await; +/// +/// // Use the subscriber to access Queryable trait methods +/// # Ok(()) +/// # } +/// ``` +/// +/// # Migration from StateService/FetchService +/// +/// If migrating from `StateService::spawn(config)`: +/// 1. Create a `ReadStateService` and temporary JSON-RPC connector for mempool +/// 2. Convert config to `BlockCacheConfig` (or use `From` impl) +/// 3. Call `Index::new(ValidatorConnector::State(...), block_config)` +/// +/// If migrating from `FetchService::spawn(config)`: +/// 1. Create a `JsonRpSeeConnector` using the RPC fields from your `FetchServiceConfig` +/// 2. Convert remaining config fields to `BlockCacheConfig` (or use `From` impl) +/// 3. Call `Index::new(ValidatorConnector::Fetch(connector), block_config)` +/// +/// # Current Features +/// +/// - Full mempool support including streaming and filtering +/// - Unified access to finalized and non-finalized blockchain state +/// - Automatic synchronization between state layers +/// - Snapshot-based consistency for queries +pub struct Index { + blockchain_source: std::sync::Arc, + #[allow(dead_code)] + mempool: std::sync::Arc>, + non_finalized_state: std::sync::Arc>, + finalized_db: std::sync::Arc, + sync_loop_handle: Option>>, + status: AtomicStatus, +} + /// The interface to the chain index. /// -/// `ChainIndex` provides a unified interface for querying blockchain data from different +/// `Queryable` provides a unified interface for querying blockchain data from different /// backend sources. It combines access to both finalized state (older than 100 blocks) and /// non-finalized state (recent blocks that may still be reorganized). /// /// # Implementation /// -/// The primary implementation is [`NodeBackedChainIndex`], which can be backed by either: +/// The primary implementation is [`Index`], which can be backed by either: /// - Direct read access to a zebrad database via `ReadStateService` (preferred) /// - A JSON-RPC connection to a validator node (zcashd, zebrad, or another zainod) /// @@ -60,7 +194,7 @@ mod tests; /// /// ```no_run /// # async fn example() -> Result<(), Box> { -/// use zaino_state::{ChainIndex, NodeBackedChainIndex, ValidatorConnector, BlockCacheConfig}; +/// use zaino_state::{Queryable, Index, ValidatorConnector, BlockCacheConfig}; /// use zaino_fetch::jsonrpsee::connector::JsonRpSeeConnector; /// use zebra_state::{ReadStateService, Config as ZebraConfig}; /// use std::path::PathBuf; @@ -97,13 +231,13 @@ mod tests; /// ); /// /// // Create the chain index and get a subscriber for queries -/// let chain_index = NodeBackedChainIndex::new(source, config).await?; -/// let subscriber = chain_index.subscriber().await; +/// let index = Index::new(source, config).await?; +/// let subscriber = index.subscriber().await; /// /// // Take a snapshot for consistent queries /// let snapshot = subscriber.snapshot_nonfinalized_state(); /// -/// // Query blocks in a range using the subscriber +/// // Queryable blocks in a range using the subscriber /// if let Some(stream) = subscriber.get_block_range( /// &snapshot, /// zaino_state::Height(100000), @@ -119,7 +253,7 @@ mod tests; /// /// ```no_run /// # async fn example() -> Result<(), Box> { -/// use zaino_state::{ChainIndex, NodeBackedChainIndex, ValidatorConnector, BlockCacheConfig}; +/// use zaino_state::{Queryable, Index, ValidatorConnector, BlockCacheConfig}; /// use zaino_fetch::jsonrpsee::connector::JsonRpSeeConnector; /// use std::path::PathBuf; /// @@ -132,7 +266,7 @@ mod tests; /// None, // no cookie path /// ).await?; /// -/// // Wrap the connector for use with ChainIndex +/// // Wrap the connector for use with Queryable /// let source = ValidatorConnector::Fetch(connector); /// /// // Configure the block cache (same as above) @@ -148,10 +282,10 @@ mod tests; /// ); /// /// // Create the chain index and get a subscriber for queries -/// let chain_index = NodeBackedChainIndex::new(source, config).await?; -/// let subscriber = chain_index.subscriber().await; +/// let index = Index::new(source, config).await?; +/// let subscriber = index.subscriber().await; /// -/// // Use the subscriber to access ChainIndex trait methods +/// // Use the subscriber to access Queryable trait methods /// let snapshot = subscriber.snapshot_nonfinalized_state(); /// # Ok(()) /// # } @@ -162,10 +296,10 @@ mod tests; /// If you were previously using `FetchService::spawn()` or `StateService::spawn()`: /// 1. Extract the relevant fields from your service config into a `BlockCacheConfig` /// 2. Create the appropriate `ValidatorConnector` variant (State or Fetch) -/// 3. Call `NodeBackedChainIndex::new(source, config).await` -pub trait ChainIndex { +/// 3. Call `Index::new(source, config).await` +pub trait Queryable { /// A snapshot of the nonfinalized state, needed for atomic access - type Snapshot: NonFinalizedSnapshot; + type Snapshot: snapshot::Snapshottable; /// How it can fail type Error; @@ -257,140 +391,7 @@ pub trait ChainIndex { ) -> Option, Self::Error>>>; } -/// The combined index. Contains a view of the mempool, and the full -/// chain state, both finalized and non-finalized, to allow queries over -/// the entire chain at once. -/// -/// This is the primary implementation backing [`ChainIndex`] and replaces the functionality -/// previously provided by `FetchService` and `StateService`. It can be backed by either: -/// - A zebra `ReadStateService` for direct database access (preferred for performance) -/// - A JSON-RPC connection to any validator node (zcashd, zebrad, or another zainod) -/// -/// To use the [`ChainIndex`] trait methods, call [`subscriber()`](NodeBackedChainIndex::subscriber) -/// to get a [`NodeBackedChainIndexSubscriber`] which implements the trait. -/// -/// # Construction -/// -/// Use [`NodeBackedChainIndex::new()`] with: -/// - A [`ValidatorConnector`] source (State variant preferred, Fetch as fallback) -/// - A [`crate::config::BlockCacheConfig`] containing cache and database settings -/// -/// # Example with StateService (Preferred) -/// -/// ```no_run -/// # async fn example() -> Result<(), Box> { -/// use zaino_state::{NodeBackedChainIndex, ValidatorConnector, BlockCacheConfig}; -/// use zaino_state::chain_index::source::State; -/// use zaino_fetch::jsonrpsee::connector::JsonRpSeeConnector; -/// use zebra_state::{ReadStateService, Config as ZebraConfig}; -/// use std::path::PathBuf; -/// -/// // Create ReadStateService for direct database access -/// let zebra_config = ZebraConfig::default(); -/// let read_state_service = ReadStateService::new(&zebra_config).await?; -/// -/// // Temporary: Create JSON-RPC connector for mempool access -/// let mempool_connector = JsonRpSeeConnector::new_from_config_parts( -/// false, -/// "127.0.0.1:8232".parse()?, -/// "user".to_string(), -/// "password".to_string(), -/// None, -/// ).await?; -/// -/// let source = ValidatorConnector::State(State { -/// read_state_service, -/// mempool_fetcher: mempool_connector, -/// }); -/// -/// // Configure the cache (extract these from your previous StateServiceConfig) -/// let config = BlockCacheConfig { -/// map_capacity: Some(1000), -/// map_shard_amount: Some(16), -/// db_version: 1, -/// db_path: PathBuf::from("/path/to/cache"), -/// db_size: Some(10), // GB -/// network: zebra_chain::parameters::Network::Mainnet, -/// no_sync: false, -/// no_db: false, -/// }; -/// -/// let chain_index = NodeBackedChainIndex::new(source, config).await?; -/// let subscriber = chain_index.subscriber().await; -/// -/// // Use the subscriber to access ChainIndex trait methods -/// let snapshot = subscriber.snapshot_nonfinalized_state(); -/// # Ok(()) -/// # } -/// ``` -/// -/// # Example with JSON-RPC Only (Fallback) -/// -/// ```no_run -/// # async fn example() -> Result<(), Box> { -/// use zaino_state::{NodeBackedChainIndex, ValidatorConnector, BlockCacheConfig}; -/// use zaino_fetch::jsonrpsee::connector::JsonRpSeeConnector; -/// use std::path::PathBuf; -/// -/// // For JSON-RPC backend (replaces FetchService::spawn) -/// let connector = JsonRpSeeConnector::new_from_config_parts( -/// false, -/// "127.0.0.1:8232".parse()?, -/// "user".to_string(), -/// "password".to_string(), -/// None, -/// ).await?; -/// let source = ValidatorConnector::Fetch(connector); -/// -/// // Configure the cache (extract these from your previous FetchServiceConfig) -/// let config = BlockCacheConfig { -/// map_capacity: Some(1000), -/// map_shard_amount: Some(16), -/// db_version: 1, -/// db_path: PathBuf::from("/path/to/cache"), -/// db_size: Some(10), // GB -/// network: zebra_chain::parameters::Network::Mainnet, -/// no_sync: false, -/// no_db: false, -/// }; -/// -/// let chain_index = NodeBackedChainIndex::new(source, config).await?; -/// let subscriber = chain_index.subscriber().await; -/// -/// // Use the subscriber to access ChainIndex trait methods -/// # Ok(()) -/// # } -/// ``` -/// -/// # Migration from StateService/FetchService -/// -/// If migrating from `StateService::spawn(config)`: -/// 1. Create a `ReadStateService` and temporary JSON-RPC connector for mempool -/// 2. Convert config to `BlockCacheConfig` (or use `From` impl) -/// 3. Call `NodeBackedChainIndex::new(ValidatorConnector::State(...), block_config)` -/// -/// If migrating from `FetchService::spawn(config)`: -/// 1. Create a `JsonRpSeeConnector` using the RPC fields from your `FetchServiceConfig` -/// 2. Convert remaining config fields to `BlockCacheConfig` (or use `From` impl) -/// 3. Call `NodeBackedChainIndex::new(ValidatorConnector::Fetch(connector), block_config)` -/// -/// # Current Features -/// -/// - Full mempool support including streaming and filtering -/// - Unified access to finalized and non-finalized blockchain state -/// - Automatic synchronization between state layers -/// - Snapshot-based consistency for queries -pub struct NodeBackedChainIndex { - blockchain_source: std::sync::Arc, - #[allow(dead_code)] - mempool: std::sync::Arc>, - non_finalized_state: std::sync::Arc>, - finalized_db: std::sync::Arc, - sync_loop_handle: Option>>, - status: AtomicStatus, -} - -impl NodeBackedChainIndex { +impl Index { /// Creates a new chainindex from a connection to a validator /// Currently this is a ReadStateService or JsonRpSeeConnector pub async fn new( @@ -419,7 +420,7 @@ impl NodeBackedChainIndex { ) .await?; - let mut chain_index = Self { + let mut index = Self { blockchain_source: Arc::new(source), mempool: std::sync::Arc::new(mempool_state), non_finalized_state: std::sync::Arc::new(non_finalized_state), @@ -427,15 +428,17 @@ impl NodeBackedChainIndex { sync_loop_handle: None, status: AtomicStatus::new(StatusType::Spawning), }; - chain_index.sync_loop_handle = Some(chain_index.start_sync_loop()); + index.sync_loop_handle = Some(index.start_sync_loop()); - Ok(chain_index) + Ok(index) } - /// Creates a [`NodeBackedChainIndexSubscriber`] from self, + /// Creates a [`Subscriber`] from self, /// a clone-safe, drop-safe, read-only view onto the running indexer. - pub async fn subscriber(&self) -> NodeBackedChainIndexSubscriber { - NodeBackedChainIndexSubscriber { + /// This relatively simple name presumes the developer attends + /// to the containing mod name [`crate::chain_index`]. + pub async fn subscriber(&self) -> Subscriber { + Subscriber { blockchain_source: self.blockchain_source.as_ref().clone(), mempool: self.mempool.subscriber(), non_finalized_state: self.non_finalized_state.clone(), @@ -454,7 +457,7 @@ impl NodeBackedChainIndex { Ok(()) } - /// Displays the status of the chain_index + /// Displays the status of the index pub fn status(&self) -> StatusType { let finalized_status = self.finalized_db.status(); let mempool_status = self.mempool.status(); @@ -468,7 +471,7 @@ impl NodeBackedChainIndex { } pub(super) fn start_sync_loop(&self) -> tokio::task::JoinHandle> { - info!("Starting ChainIndex sync."); + info!("Starting Index sync."); let nfs = self.non_finalized_state.clone(); let fs = self.finalized_db.clone(); let status = self.status.clone(); @@ -527,13 +530,13 @@ impl NodeBackedChainIndex { } } -/// A clone-safe *read-only* view onto a running [`NodeBackedChainIndex`]. +/// A clone-safe *read-only* view onto a running [`Index`]. /// /// Designed for concurrent efficiency. /// -/// [`NodeBackedChainIndexSubscriber`] can safely be cloned and dropped freely. +/// [`Subscriber`] can safely be cloned and dropped freely. #[derive(Clone)] -pub struct NodeBackedChainIndexSubscriber { +pub struct Subscriber { blockchain_source: Source, mempool: mempool::MempoolSubscriber, non_finalized_state: std::sync::Arc>, @@ -541,8 +544,8 @@ pub struct NodeBackedChainIndexSubscriber NodeBackedChainIndexSubscriber { - /// Displays the status of the chain_index +impl Subscriber { + /// Displays the status of the index pub fn status(&self) -> StatusType { let finalized_status = self.finalized_state.status(); let mempool_status = self.mempool.status(); @@ -612,7 +615,7 @@ impl NodeBackedChainIndexSubscriber { } } -impl ChainIndex for NodeBackedChainIndexSubscriber { +impl Queryable for Subscriber { type Snapshot = Arc; type Error = ChainIndexError; @@ -774,7 +777,7 @@ impl ChainIndex for NodeBackedChainIndexSubscriber ChainIndex for NodeBackedChainIndexSubscriber NonFinalizedSnapshot for Arc -where - T: NonFinalizedSnapshot, -{ - fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock> { - self.as_ref().get_chainblock_by_hash(target_hash) - } - - fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock> { - self.as_ref().get_chainblock_by_height(target_height) - } - - fn best_chaintip(&self) -> BestTip { - self.as_ref().best_chaintip() - } -} - -/// A snapshot of the non-finalized state, for consistent queries -pub trait NonFinalizedSnapshot { - /// Hash -> block - fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock>; - /// Height -> block - fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock>; - /// Get the tip of the best chain, according to the snapshot - fn best_chaintip(&self) -> BestTip; -} - -impl NonFinalizedSnapshot for NonfinalizedBlockCacheSnapshot { - fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock> { - self.blocks.iter().find_map(|(hash, chainblock)| { - if hash == target_hash { - Some(chainblock) - } else { - None - } - }) - } - fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock> { - self.heights_to_hashes.iter().find_map(|(height, hash)| { - if height == target_height { - self.get_chainblock_by_hash(hash) - } else { - None - } - }) - } - - fn best_chaintip(&self) -> BestTip { - self.best_tip - } -} diff --git a/zaino-state/src/chain_index/non_finalised_state.rs b/zaino-state/src/chain_index/non_finalised_state.rs index e312cf4c2..998ef03b6 100644 --- a/zaino-state/src/chain_index/non_finalised_state.rs +++ b/zaino-state/src/chain_index/non_finalised_state.rs @@ -1,8 +1,11 @@ -use super::{finalised_state::ZainoDB, source::BlockchainSource}; use crate::{ - chain_index::types::{self, BlockHash, BlockMetadata, BlockWithMetadata, Height, TreeRootData}, + chain_index::{ + finalised_state::ZainoDB, + source::BlockchainSourceError, + types::{self, BlockHash, BlockMetadata, BlockWithMetadata, Height, TreeRootData}, + }, error::FinalisedStateError, - ChainWork, IndexedBlock, + BlockchainSource, ChainWork, IndexedBlock, }; use arc_swap::ArcSwap; use futures::lock::Mutex; @@ -17,7 +20,7 @@ use zebra_state::HashOrHeight; pub struct NonFinalizedState { /// We need access to the validator's best block hash, as well /// as a source of blocks - pub(super) source: Source, + pub(crate) source: Source, staged: Mutex>, staging_sender: mpsc::Sender, /// This lock should not be exposed to consumers. Rather, @@ -286,7 +289,7 @@ impl NonFinalizedState { } /// sync to the top of the chain, trimming to the finalised tip. - pub(super) async fn sync(&self, finalized_db: Arc) -> Result<(), SyncError> { + pub(crate) async fn sync(&self, finalized_db: Arc) -> Result<(), SyncError> { let initial_state = self.get_snapshot(); let mut nonbest_blocks = HashMap::new(); @@ -660,7 +663,7 @@ impl NonFinalizedState { } /// Get a snapshot of the block cache - pub(super) fn get_snapshot(&self) -> Arc { + pub(crate) fn get_snapshot(&self) -> Arc { self.current.load_full() } @@ -695,7 +698,7 @@ impl NonFinalizedState { async fn get_tree_roots_from_source( &self, block_hash: BlockHash, - ) -> Result { + ) -> Result { let (sapling_root_and_len, orchard_root_and_len) = self.source.get_commitment_tree_roots(block_hash).await?; diff --git a/zaino-state/src/chain_index/snapshot.rs b/zaino-state/src/chain_index/snapshot.rs new file mode 100644 index 000000000..022c3fbe8 --- /dev/null +++ b/zaino-state/src/chain_index/snapshot.rs @@ -0,0 +1,59 @@ +//! This mod provides a readable view of the index state at a given point in time. +use crate::chain_index::non_finalised_state::{BestTip, NonfinalizedBlockCacheSnapshot}; +/// State that has not been confirmed by at least 100 blocks. +use crate::chain_index::types; +use crate::IndexedBlock; + +use std::sync::Arc; + +impl Snapshottable for Arc +where + T: Snapshottable, +{ + fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock> { + self.as_ref().get_chainblock_by_hash(target_hash) + } + + fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock> { + self.as_ref().get_chainblock_by_height(target_height) + } + + fn best_chaintip(&self) -> BestTip { + self.as_ref().best_chaintip() + } +} + +/// A snapshot of the non-finalized state, for consistent queries +pub trait Snapshottable { + /// Hash -> block + fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock>; + /// Height -> block + fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock>; + /// Get the tip of the best chain, according to the snapshot + fn best_chaintip(&self) -> BestTip; +} + +impl Snapshottable for NonfinalizedBlockCacheSnapshot { + fn get_chainblock_by_hash(&self, target_hash: &types::BlockHash) -> Option<&IndexedBlock> { + self.blocks.iter().find_map(|(hash, chainblock)| { + if hash == target_hash { + Some(chainblock) + } else { + None + } + }) + } + fn get_chainblock_by_height(&self, target_height: &types::Height) -> Option<&IndexedBlock> { + self.heights_to_hashes.iter().find_map(|(height, hash)| { + if height == target_height { + self.get_chainblock_by_hash(hash) + } else { + None + } + }) + } + + fn best_chaintip(&self) -> BestTip { + self.best_tip + } +} diff --git a/zaino-state/src/chain_index/tests.rs b/zaino-state/src/chain_index/tests.rs index e0d59edff..b05e36053 100644 --- a/zaino-state/src/chain_index/tests.rs +++ b/zaino-state/src/chain_index/tests.rs @@ -32,7 +32,7 @@ mod mockchain_tests { TestVectorBlockData, }, types::{BestChainLocation, TransactionHash}, - ChainIndex, NodeBackedChainIndex, NodeBackedChainIndexSubscriber, + Index, Queryable, Subscriber, }, BlockCacheConfig, }; @@ -41,8 +41,8 @@ mod mockchain_tests { active_mockchain_source: bool, ) -> ( Vec, - NodeBackedChainIndex, - NodeBackedChainIndexSubscriber, + Index, + Subscriber, MockchainSource, ) { super::init_tracing(); @@ -75,9 +75,7 @@ mod mockchain_tests { network: Network::Regtest(ActivationHeights::default()), }; - let indexer = NodeBackedChainIndex::new(source.clone(), config) - .await - .unwrap(); + let indexer = Index::new(source.clone(), config).await.unwrap(); let index_reader = indexer.subscriber().await; loop { @@ -105,7 +103,7 @@ mod mockchain_tests { let start = crate::Height(0); let indexer_blocks = - ChainIndex::get_block_range(&index_reader, &nonfinalized_snapshot, start, None) + Queryable::get_block_range(&index_reader, &nonfinalized_snapshot, start, None) .unwrap() .collect::>() .await; diff --git a/zaino-state/src/lib.rs b/zaino-state/src/lib.rs index eb676fedd..59a619d38 100644 --- a/zaino-state/src/lib.rs +++ b/zaino-state/src/lib.rs @@ -31,7 +31,8 @@ pub use backends::{ pub mod chain_index; // Core ChainIndex trait and implementations -pub use chain_index::{ChainIndex, NodeBackedChainIndex, NodeBackedChainIndexSubscriber}; +pub use chain_index::Queryable as ChainIndexQueryable; +pub use chain_index::{Index, Subscriber}; // Source types for ChainIndex backends pub use chain_index::source::{BlockchainSource, State, ValidatorConnector}; // Supporting types @@ -41,6 +42,7 @@ pub use chain_index::non_finalised_state::{ InitError, NodeConnectionError, NonFinalizedState, NonfinalizedBlockCacheSnapshot, SyncError, UpdateError, }; +pub use chain_index::snapshot::Snapshottable; // NOTE: Should these be pub at all? pub use chain_index::types::{ AddrHistRecord, AddrScript, BlockData, BlockHash, BlockHeaderData, BlockIndex, BlockMetadata, diff --git a/zaino-testutils/src/lib.rs b/zaino-testutils/src/lib.rs index 34e2ac15b..e9d7cb666 100644 --- a/zaino-testutils/src/lib.rs +++ b/zaino-testutils/src/lib.rs @@ -22,8 +22,8 @@ use zaino_common::{ }; use zaino_serve::server::config::{GrpcServerConfig, JsonRpcServerConfig}; use zaino_state::{ - chain_index::NonFinalizedSnapshot, BackendType, ChainIndex, LightWalletIndexer, - LightWalletService, NodeBackedChainIndexSubscriber, ZcashIndexer, ZcashService, + BackendType, ChainIndexQueryable as _, LightWalletIndexer, LightWalletService, + Snapshottable as _, Subscriber, ZcashIndexer, ZcashService, }; use zainodlib::{config::ZainodConfig, error::IndexerError, indexer::Indexer}; pub use zcash_local_net as services; @@ -657,11 +657,7 @@ where } /// Generate `n` blocks for the local network and poll zaino's chain index until the chain index is synced to the target height. - pub async fn generate_blocks_and_poll_chain_index( - &self, - n: u32, - chain_index: &NodeBackedChainIndexSubscriber, - ) { + pub async fn generate_blocks_and_poll_chain_index(&self, n: u32, chain_index: &Subscriber) { let chain_height = self.local_net.get_chain_height().await; let mut next_block_height = u32::from(chain_height) + 1; let mut interval = tokio::time::interval(std::time::Duration::from_millis(200));