diff --git a/Cargo.toml b/Cargo.toml index d39f644..c34aab4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,7 @@ alloy-primitives = "0.8" alloy-consensus = "0.9" alloy-rlp = "0.3" alloy-eips = "0.9" +rusqlite = { version = "0.38", features = ["bundled"] } [workspace.lints.rust] # missing_docs = "deny" diff --git a/bin/evd/src/main.rs b/bin/evd/src/main.rs index 724b89c..74af602 100644 --- a/bin/evd/src/main.rs +++ b/bin/evd/src/main.rs @@ -216,11 +216,21 @@ fn run_node(config: NodeConfig, genesis_config: Option) { // Create shared mempool let mempool: SharedMempool> = new_shared_mempool(); - // Create chain index (shared between RPC and block callback) - let chain_index = Arc::new(PersistentChainIndex::new(Arc::new(storage.clone()))); - if let Err(e) = chain_index.initialize() { - tracing::warn!("Failed to initialize chain index: {:?}", e); - } + // Create chain index backed by SQLite (only when needed) + let chain_index = if config.rpc.enabled || config.rpc.enable_block_indexing { + let chain_index_db_path = + std::path::PathBuf::from(&config.storage.path).join("chain-index.sqlite"); + let index = Arc::new( + PersistentChainIndex::new(&chain_index_db_path) + .expect("failed to open chain index database"), + ); + if let Err(e) = index.initialize() { + tracing::warn!("Failed to initialize chain index: {:?}", e); + } + Some(index) + } else { + None + }; // Set up JSON-RPC server if enabled let rpc_handle = if config.rpc.enabled { @@ -235,7 +245,7 @@ fn run_node(config: NodeConfig, genesis_config: Option) { }; let state_provider = ChainStateProvider::with_mempool( - Arc::clone(&chain_index), + Arc::clone(chain_index.as_ref().expect("chain index required for RPC")), state_provider_config, codes_for_rpc, mempool.clone(), @@ -267,7 +277,7 @@ fn run_node(config: NodeConfig, genesis_config: Option) { // Build the OnBlockExecuted callback: commits state to storage + indexes blocks let storage_for_callback = storage.clone(); - let chain_index_for_callback = Arc::clone(&chain_index); + let chain_index_for_callback = chain_index.clone(); let parent_hash_for_callback = Arc::clone(&parent_hash); let current_height_for_callback = Arc::clone(¤t_height); let callback_chain_id = config.chain.chain_id; @@ -315,20 +325,20 @@ fn run_node(config: NodeConfig, genesis_config: Option) { let (stored_block, stored_txs, stored_receipts) = build_index_data(&block, &info.block_result, &metadata); - if callback_indexing_enabled { - if let Err(e) = chain_index_for_callback.store_block( - stored_block, - stored_txs, - stored_receipts, - ) { - tracing::warn!("Failed to index block {}: {:?}", info.height, e); - } else { - tracing::debug!( - "Indexed block {} (hash={}, state_root={})", - info.height, - block_hash, - state_root - ); + if let Some(ref chain_index) = chain_index_for_callback { + if callback_indexing_enabled { + if let Err(e) = + chain_index.store_block(stored_block, stored_txs, stored_receipts) + { + tracing::warn!("Failed to index block {}: {:?}", info.height, e); + } else { + tracing::debug!( + "Indexed block {} (hash={}, state_root={})", + info.height, + block_hash, + state_root + ); + } } } diff --git a/crates/app/node/src/lib.rs b/crates/app/node/src/lib.rs index c94dbe8..dd690f5 100644 --- a/crates/app/node/src/lib.rs +++ b/crates/app/node/src/lib.rs @@ -257,6 +257,7 @@ pub fn run_dev_node_with_rpc< path: data_dir.to_path_buf(), ..Default::default() }; + let chain_index_db_path = data_dir.join("chain-index.sqlite"); let runtime_config = TokioConfig::default() .with_storage_directory(data_dir) @@ -277,6 +278,7 @@ pub fn run_dev_node_with_rpc< let run_genesis = Arc::clone(&run_genesis); let build_storage = Arc::clone(&build_storage); let rpc_config = rpc_config.clone(); + let chain_index_db_path = chain_index_db_path.clone(); async move { // Clone context early since build_storage takes ownership @@ -323,10 +325,11 @@ pub fn run_dev_node_with_rpc< // Set up RPC infrastructure if enabled let rpc_handle = if rpc_config.enabled { - // Create chain index for RPC queries - // Clone storage since both DevConsensus and ChainIndex need access - let storage_for_index = storage.clone(); - let chain_index = Arc::new(PersistentChainIndex::new(Arc::new(storage_for_index))); + // Create chain index backed by SQLite + let chain_index = Arc::new( + PersistentChainIndex::new(&chain_index_db_path) + .expect("failed to open chain index database"), + ); // Initialize from existing data if let Err(e) = chain_index.initialize() { @@ -366,7 +369,7 @@ pub fn run_dev_node_with_rpc< .expect("failed to start RPC server"); // Create DevConsensus with RPC support - let dev: Arc>> = Arc::new( + let dev: Arc> = Arc::new( DevConsensus::with_rpc( stf, storage, @@ -683,6 +686,7 @@ pub fn run_dev_node_with_rpc_and_mempool_eth< path: data_dir.to_path_buf(), ..Default::default() }; + let chain_index_db_path = data_dir.join("chain-index.sqlite"); let runtime_config = TokioConfig::default() .with_storage_directory(data_dir) @@ -703,6 +707,7 @@ pub fn run_dev_node_with_rpc_and_mempool_eth< let run_genesis = Arc::clone(&run_genesis); let build_storage = Arc::clone(&build_storage); let rpc_config = rpc_config.clone(); + let chain_index_db_path = chain_index_db_path.clone(); async move { let context_for_shutdown = context.clone(); @@ -747,8 +752,10 @@ pub fn run_dev_node_with_rpc_and_mempool_eth< let mempool: SharedMempool> = new_shared_mempool(); let rpc_handle = if rpc_config.enabled { - let storage_for_index = storage.clone(); - let chain_index = Arc::new(PersistentChainIndex::new(Arc::new(storage_for_index))); + let chain_index = Arc::new( + PersistentChainIndex::new(&chain_index_db_path) + .expect("failed to open chain index database"), + ); if let Err(e) = chain_index.initialize() { tracing::warn!("Failed to initialize chain index: {:?}", e); @@ -784,7 +791,7 @@ pub fn run_dev_node_with_rpc_and_mempool_eth< .await .expect("failed to start RPC server"); - let dev: Arc>> = + let dev: Arc> = Arc::new( DevConsensus::with_rpc_and_mempool( stf, diff --git a/crates/rpc/chain-index/Cargo.toml b/crates/rpc/chain-index/Cargo.toml index 283f862..0ca4f60 100644 --- a/crates/rpc/chain-index/Cargo.toml +++ b/crates/rpc/chain-index/Cargo.toml @@ -16,7 +16,6 @@ archive = [] evolve_core.workspace = true evolve_stf_traits.workspace = true evolve_stf.workspace = true -evolve_storage.workspace = true evolve_rpc_types.workspace = true evolve_eth_jsonrpc.workspace = true evolve_mempool.workspace = true @@ -34,9 +33,13 @@ parking_lot = "0.12" lru = "0.12" sha2.workspace = true futures = "0.3" +rusqlite.workspace = true +r2d2 = "0.8" +r2d2_sqlite = "0.32" [dev-dependencies] proptest = "1.4" +tempfile = "3" [lints] workspace = true diff --git a/crates/rpc/chain-index/proptest-regressions/index.txt b/crates/rpc/chain-index/proptest-regressions/index.txt new file mode 100644 index 0000000..755fbea --- /dev/null +++ b/crates/rpc/chain-index/proptest-regressions/index.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 0df5a6988f79f296c9ef1b47df11465b35b7c176c5e9e6785964bf3bf564067f # shrinks to block_number = 0, tx_count1 = 1, tx_count2 = 0 diff --git a/crates/rpc/chain-index/src/error.rs b/crates/rpc/chain-index/src/error.rs index 4688759..a764fa1 100644 --- a/crates/rpc/chain-index/src/error.rs +++ b/crates/rpc/chain-index/src/error.rs @@ -44,6 +44,10 @@ pub enum ChainIndexError { /// Index is empty (no blocks stored). #[error("chain index is empty")] EmptyIndex, + + /// SQLite database error. + #[error("sqlite error: {0}")] + Sqlite(String), } impl From for ChainIndexError { @@ -58,5 +62,11 @@ impl From for ChainIndexError { } } +impl From for ChainIndexError { + fn from(err: rusqlite::Error) -> Self { + ChainIndexError::Sqlite(err.to_string()) + } +} + /// Result type for chain indexing operations. pub type ChainIndexResult = Result; diff --git a/crates/rpc/chain-index/src/index.rs b/crates/rpc/chain-index/src/index.rs index 4edfc11..62a8e4e 100644 --- a/crates/rpc/chain-index/src/index.rs +++ b/crates/rpc/chain-index/src/index.rs @@ -1,123 +1,19 @@ //! Chain index trait and persistent implementation. //! //! The `ChainIndex` trait defines operations for storing and retrieving chain data. -//! `PersistentChainIndex` implements this trait using the storage layer. -//! -//! Note: This uses synchronous methods to avoid Send bound issues with the underlying -//! storage. Reads are already synchronous, and writes use `futures::executor::block_on`. +//! `PersistentChainIndex` implements this trait using a SQLite database with a +//! connection pool (r2d2) for concurrent reads and a dedicated writer connection. -use std::sync::Arc; +use std::sync::Mutex; use alloy_primitives::B256; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{params, Connection}; use crate::cache::ChainCache; use crate::error::{ChainIndexError, ChainIndexResult}; use crate::types::{StoredBlock, StoredLog, StoredReceipt, StoredTransaction, TxLocation}; -use evolve_storage::Storage; - -/// Maximum number of transaction hashes per storage chunk. -const TX_HASH_CHUNK_SIZE: usize = 32; - -/// Maximum number of log entries per storage chunk. -const LOG_CHUNK_SIZE: usize = 16; - -/// Storage key prefixes for chain data. -mod keys { - use alloy_primitives::B256; - - /// Block header by number: `blk:h:{number}` -> StoredBlock - pub const BLOCK_HEADER: &[u8] = b"blk:h:"; - /// Block number by hash: `blk:n:{hash}` -> u64 - pub const BLOCK_NUMBER: &[u8] = b"blk:n:"; - /// Transaction hashes in block: `blk:t:{number}` -> Vec - pub const BLOCK_TXS: &[u8] = b"blk:t:"; - /// Chunked transaction hashes count by block: `blk:t:c:{number}` -> u32 - pub const BLOCK_TXS_CHUNK_COUNT: &[u8] = b"blk:t:c:"; - /// Chunked transaction hashes by block/index: `blk:t:k:{number}:{idx}` -> Vec - pub const BLOCK_TXS_CHUNK: &[u8] = b"blk:t:k:"; - /// Transaction data by hash: `tx:d:{hash}` -> StoredTransaction - pub const TX_DATA: &[u8] = b"tx:d:"; - /// Transaction location: `tx:l:{hash}` -> TxLocation - pub const TX_LOCATION: &[u8] = b"tx:l:"; - /// Receipt by tx hash: `tx:r:{hash}` -> StoredReceipt - pub const TX_RECEIPT: &[u8] = b"tx:r:"; - /// Logs by block: `log:b:{number}` -> Vec - pub const LOGS_BY_BLOCK: &[u8] = b"log:b:"; - /// Chunked logs count by block: `log:b:c:{number}` -> u32 - pub const LOGS_BY_BLOCK_CHUNK_COUNT: &[u8] = b"log:b:c:"; - /// Chunked logs by block/index: `log:b:k:{number}:{idx}` -> Vec - pub const LOGS_BY_BLOCK_CHUNK: &[u8] = b"log:b:k:"; - /// Latest block number: `meta:latest` -> u64 - pub const META_LATEST: &[u8] = b"meta:latest"; - - pub fn block_header_key(number: u64) -> Vec { - let mut key = BLOCK_HEADER.to_vec(); - key.extend_from_slice(&number.to_be_bytes()); - key - } - - pub fn block_number_key(hash: &B256) -> Vec { - let mut key = BLOCK_NUMBER.to_vec(); - key.extend_from_slice(hash.as_slice()); - key - } - - pub fn block_txs_key(number: u64) -> Vec { - let mut key = BLOCK_TXS.to_vec(); - key.extend_from_slice(&number.to_be_bytes()); - key - } - - pub fn block_txs_chunk_count_key(number: u64) -> Vec { - let mut key = BLOCK_TXS_CHUNK_COUNT.to_vec(); - key.extend_from_slice(&number.to_be_bytes()); - key - } - - pub fn block_txs_chunk_key(number: u64, chunk_index: u32) -> Vec { - let mut key = BLOCK_TXS_CHUNK.to_vec(); - key.extend_from_slice(&number.to_be_bytes()); - key.extend_from_slice(&chunk_index.to_be_bytes()); - key - } - - pub fn tx_data_key(hash: &B256) -> Vec { - let mut key = TX_DATA.to_vec(); - key.extend_from_slice(hash.as_slice()); - key - } - - pub fn tx_location_key(hash: &B256) -> Vec { - let mut key = TX_LOCATION.to_vec(); - key.extend_from_slice(hash.as_slice()); - key - } - - pub fn tx_receipt_key(hash: &B256) -> Vec { - let mut key = TX_RECEIPT.to_vec(); - key.extend_from_slice(hash.as_slice()); - key - } - - pub fn logs_by_block_key(number: u64) -> Vec { - let mut key = LOGS_BY_BLOCK.to_vec(); - key.extend_from_slice(&number.to_be_bytes()); - key - } - - pub fn logs_by_block_chunk_count_key(number: u64) -> Vec { - let mut key = LOGS_BY_BLOCK_CHUNK_COUNT.to_vec(); - key.extend_from_slice(&number.to_be_bytes()); - key - } - - pub fn logs_by_block_chunk_key(number: u64, chunk_index: u32) -> Vec { - let mut key = LOGS_BY_BLOCK_CHUNK.to_vec(); - key.extend_from_slice(&number.to_be_bytes()); - key.extend_from_slice(&chunk_index.to_be_bytes()); - key - } -} /// Trait for chain data indexing operations. /// @@ -160,24 +56,87 @@ pub trait ChainIndex: Send + Sync { ) -> ChainIndexResult<()>; } -/// Persistent chain index backed by storage. -pub struct PersistentChainIndex { - storage: Arc, +/// Persistent chain index backed by SQLite. +/// +/// Uses a connection pool for concurrent reads and a dedicated writer connection +/// for serialized writes. SQLite WAL mode allows readers to proceed without +/// blocking the writer and vice versa. +pub struct PersistentChainIndex { + /// Connection pool for read operations (concurrent). + read_pool: Pool, + /// Dedicated connection for write operations (serialized). + writer: Mutex, cache: ChainCache, } -impl PersistentChainIndex { - /// Create a new persistent chain index. - pub fn new(storage: Arc) -> Self { - Self { - storage, +/// Configure a connection with standard PRAGMAs for WAL mode. +fn configure_connection(conn: &Connection) -> Result<(), rusqlite::Error> { + conn.execute_batch( + "PRAGMA journal_mode=WAL; + PRAGMA synchronous=NORMAL; + PRAGMA foreign_keys=ON;", + ) +} + +impl PersistentChainIndex { + /// Create a new persistent chain index backed by an on-disk SQLite database. + pub fn new(db_path: impl AsRef) -> ChainIndexResult { + // Writer connection -- dedicated for store_block + let writer = Connection::open(&db_path)?; + configure_connection(&writer)?; + + // Read pool -- concurrent read-only connections + let manager = SqliteConnectionManager::file(&db_path) + .with_flags( + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY + | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX, + ) + .with_init(|conn| configure_connection(conn)); + let read_pool = Pool::builder() + .max_size(4) + .build(manager) + .map_err(|e| ChainIndexError::Sqlite(e.to_string()))?; + + let index = Self { + read_pool, + writer: Mutex::new(writer), cache: ChainCache::with_defaults(), - } + }; + index.init_schema()?; + Ok(index) } - /// Create with custom cache configuration. - pub fn with_cache(storage: Arc, cache: ChainCache) -> Self { - Self { storage, cache } + /// Create an in-memory chain index for testing. + /// + /// In-memory SQLite DBs are per-connection, so tests use a single shared + /// connection for both reads and writes via a file-based shared cache URI. + pub fn in_memory() -> ChainIndexResult { + // Use a named in-memory DB with shared cache so all connections see the same data. + let uri = format!("file:test_{}?mode=memory&cache=shared", unique_id()); + let writer = Connection::open(&uri)?; + configure_connection(&writer)?; + + let manager = + SqliteConnectionManager::file(&uri).with_init(|conn| configure_connection(conn)); + let read_pool = Pool::builder() + .max_size(2) + .build(manager) + .map_err(|e| ChainIndexError::Sqlite(e.to_string()))?; + + let index = Self { + read_pool, + writer: Mutex::new(writer), + cache: ChainCache::with_defaults(), + }; + index.init_schema()?; + Ok(index) + } + + /// Get a read connection from the pool. + fn read_conn(&self) -> ChainIndexResult> { + self.read_pool + .get() + .map_err(|e| ChainIndexError::Sqlite(e.to_string())) } /// Get direct access to the cache. @@ -196,57 +155,250 @@ impl PersistentChainIndex { Ok(()) } + fn init_schema(&self) -> ChainIndexResult<()> { + let conn = self.writer.lock().unwrap(); + conn.execute_batch( + "CREATE TABLE IF NOT EXISTS blocks ( + number INTEGER PRIMARY KEY, + hash BLOB NOT NULL UNIQUE, + parent_hash BLOB NOT NULL, + state_root BLOB NOT NULL, + transactions_root BLOB NOT NULL, + receipts_root BLOB NOT NULL, + timestamp INTEGER NOT NULL, + gas_used INTEGER NOT NULL, + gas_limit INTEGER NOT NULL, + transaction_count INTEGER NOT NULL, + miner BLOB NOT NULL, + extra_data BLOB NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_blocks_hash ON blocks(hash); + + CREATE TABLE IF NOT EXISTS transactions ( + hash BLOB PRIMARY KEY, + block_number INTEGER NOT NULL, + block_hash BLOB NOT NULL, + transaction_index INTEGER NOT NULL, + from_addr BLOB NOT NULL, + to_addr BLOB, + value BLOB NOT NULL, + gas INTEGER NOT NULL, + gas_price BLOB NOT NULL, + input BLOB NOT NULL, + nonce INTEGER NOT NULL, + v INTEGER NOT NULL, + r BLOB NOT NULL, + s BLOB NOT NULL, + tx_type INTEGER NOT NULL, + chain_id INTEGER, + FOREIGN KEY (block_number) REFERENCES blocks(number) + ); + CREATE INDEX IF NOT EXISTS idx_tx_block ON transactions(block_number); + + CREATE TABLE IF NOT EXISTS receipts ( + transaction_hash BLOB PRIMARY KEY, + transaction_index INTEGER NOT NULL, + block_hash BLOB NOT NULL, + block_number INTEGER NOT NULL, + from_addr BLOB NOT NULL, + to_addr BLOB, + cumulative_gas_used INTEGER NOT NULL, + gas_used INTEGER NOT NULL, + contract_address BLOB, + status INTEGER NOT NULL, + tx_type INTEGER NOT NULL, + FOREIGN KEY (block_number) REFERENCES blocks(number) + ); + CREATE INDEX IF NOT EXISTS idx_receipts_block ON receipts(block_number); + + CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + block_number INTEGER NOT NULL, + transaction_hash BLOB NOT NULL, + address BLOB NOT NULL, + topics BLOB NOT NULL, + data BLOB NOT NULL, + FOREIGN KEY (block_number) REFERENCES blocks(number) + ); + CREATE INDEX IF NOT EXISTS idx_logs_block ON logs(block_number); + CREATE INDEX IF NOT EXISTS idx_logs_address ON logs(address); + CREATE INDEX IF NOT EXISTS idx_logs_tx ON logs(transaction_hash); + + CREATE TABLE IF NOT EXISTS metadata ( + key TEXT PRIMARY KEY, + value BLOB NOT NULL + );", + )?; + Ok(()) + } + fn load_latest_block_number(&self) -> ChainIndexResult> { - let value = self.storage.get(keys::META_LATEST)?; - match value { - Some(bytes) if bytes.len() == 8 => { - let arr: [u8; 8] = bytes.try_into().unwrap(); - Ok(Some(u64::from_be_bytes(arr))) - } - Some(_) => Err(ChainIndexError::Deserialization( - "invalid latest block format".to_string(), - )), - None => Ok(None), + let conn = self.read_conn()?; + let result: rusqlite::Result = conn.query_row( + "SELECT CAST(value AS INTEGER) FROM metadata WHERE key = 'latest_block'", + [], + |row| row.get(0), + ); + match result { + Ok(n) => Ok(Some(n as u64)), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), } } + + fn row_to_stored_block(row: &rusqlite::Row<'_>) -> rusqlite::Result { + use alloy_primitives::Bytes; + + let number: i64 = row.get(0)?; + let hash_bytes: Vec = row.get(1)?; + let parent_hash_bytes: Vec = row.get(2)?; + let state_root_bytes: Vec = row.get(3)?; + let transactions_root_bytes: Vec = row.get(4)?; + let receipts_root_bytes: Vec = row.get(5)?; + let timestamp: i64 = row.get(6)?; + let gas_used: i64 = row.get(7)?; + let gas_limit: i64 = row.get(8)?; + let transaction_count: i64 = row.get(9)?; + let miner_bytes: Vec = row.get(10)?; + let extra_data_bytes: Vec = row.get(11)?; + + Ok(StoredBlock { + number: number as u64, + hash: b256_from_row(&hash_bytes, 1)?, + parent_hash: b256_from_row(&parent_hash_bytes, 2)?, + state_root: b256_from_row(&state_root_bytes, 3)?, + transactions_root: b256_from_row(&transactions_root_bytes, 4)?, + receipts_root: b256_from_row(&receipts_root_bytes, 5)?, + timestamp: timestamp as u64, + gas_used: gas_used as u64, + gas_limit: gas_limit as u64, + transaction_count: transaction_count as u32, + miner: address_from_row(&miner_bytes, 10)?, + extra_data: Bytes::from(extra_data_bytes), + }) + } + + fn row_to_stored_transaction(row: &rusqlite::Row<'_>) -> rusqlite::Result { + use alloy_primitives::{Bytes, U256}; + + let hash_bytes: Vec = row.get(0)?; + let block_number: i64 = row.get(1)?; + let block_hash_bytes: Vec = row.get(2)?; + let transaction_index: i64 = row.get(3)?; + let from_bytes: Vec = row.get(4)?; + let to_bytes: Option> = row.get(5)?; + let value_bytes: Vec = row.get(6)?; + let gas: i64 = row.get(7)?; + let gas_price_bytes: Vec = row.get(8)?; + let input_bytes: Vec = row.get(9)?; + let nonce: i64 = row.get(10)?; + let v: i64 = row.get(11)?; + let r_bytes: Vec = row.get(12)?; + let s_bytes: Vec = row.get(13)?; + let tx_type: i64 = row.get(14)?; + let chain_id: Option = row.get(15)?; + + let to = to_bytes + .as_deref() + .map(|b| address_from_row(b, 5)) + .transpose()?; + + Ok(StoredTransaction { + hash: b256_from_row(&hash_bytes, 0)?, + block_number: block_number as u64, + block_hash: b256_from_row(&block_hash_bytes, 2)?, + transaction_index: transaction_index as u32, + from: address_from_row(&from_bytes, 4)?, + to, + value: U256::from_be_slice(&value_bytes), + gas: gas as u64, + gas_price: U256::from_be_slice(&gas_price_bytes), + input: Bytes::from(input_bytes), + nonce: nonce as u64, + v: v as u64, + r: U256::from_be_slice(&r_bytes), + s: U256::from_be_slice(&s_bytes), + tx_type: tx_type as u8, + chain_id: chain_id.map(|c| c as u64), + }) + } + + fn row_to_stored_receipt(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let transaction_hash_bytes: Vec = row.get(0)?; + let transaction_index: i64 = row.get(1)?; + let block_hash_bytes: Vec = row.get(2)?; + let block_number: i64 = row.get(3)?; + let from_bytes: Vec = row.get(4)?; + let to_bytes: Option> = row.get(5)?; + let cumulative_gas_used: i64 = row.get(6)?; + let gas_used: i64 = row.get(7)?; + let contract_address_bytes: Option> = row.get(8)?; + let status: i64 = row.get(9)?; + let tx_type: i64 = row.get(10)?; + + let to = to_bytes + .as_deref() + .map(|b| address_from_row(b, 5)) + .transpose()?; + let contract_address = contract_address_bytes + .as_deref() + .map(|b| address_from_row(b, 8)) + .transpose()?; + + Ok(StoredReceipt { + transaction_hash: b256_from_row(&transaction_hash_bytes, 0)?, + transaction_index: transaction_index as u32, + block_hash: b256_from_row(&block_hash_bytes, 2)?, + block_number: block_number as u64, + from: address_from_row(&from_bytes, 4)?, + to, + cumulative_gas_used: cumulative_gas_used as u64, + gas_used: gas_used as u64, + contract_address, + logs: vec![], // logs are stored separately + status: status as u8, + tx_type: tx_type as u8, + }) + } } -impl ChainIndex for PersistentChainIndex { +impl ChainIndex for PersistentChainIndex { fn latest_block_number(&self) -> ChainIndexResult> { - // Check cache first if let Some(n) = self.cache.latest_block_number() { return Ok(Some(n)); } - // Fall back to storage self.load_latest_block_number() } fn get_block(&self, number: u64) -> ChainIndexResult> { - // Check cache first if let Some(block) = self.cache.get_block_by_number(number) { return Ok(Some((*block).clone())); } - // Load from storage (synchronous) - let key = keys::block_header_key(number); - match self.storage.get(&key)? { - Some(bytes) => { - let block: StoredBlock = serde_json::from_slice(&bytes)?; - // Populate cache + let conn = self.read_conn()?; + let result = conn.query_row( + "SELECT number, hash, parent_hash, state_root, transactions_root, receipts_root, + timestamp, gas_used, gas_limit, transaction_count, miner, extra_data + FROM blocks WHERE number = ?", + params![number as i64], + Self::row_to_stored_block, + ); + + match result { + Ok(block) => { self.cache.insert_block(block.clone()); Ok(Some(block)) } - None => Ok(None), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), } } fn get_block_by_hash(&self, hash: B256) -> ChainIndexResult> { - // Check cache first if let Some(block) = self.cache.get_block_by_hash(hash) { return Ok(Some((*block).clone())); } - // Look up block number by hash let number = match self.get_block_number(hash)? { Some(n) => n, None => return Ok(None), @@ -256,148 +408,129 @@ impl ChainIndex for PersistentChainIndex { } fn get_block_number(&self, hash: B256) -> ChainIndexResult> { - // Check cache first if let Some(n) = self.cache.get_block_number_by_hash(hash) { return Ok(Some(n)); } - // Load from storage - let key = keys::block_number_key(&hash); - match self.storage.get(&key)? { - Some(bytes) if bytes.len() == 8 => { - let arr: [u8; 8] = bytes.try_into().unwrap(); - Ok(Some(u64::from_be_bytes(arr))) - } - Some(_) => Err(ChainIndexError::Deserialization( - "invalid block number format".to_string(), - )), - None => Ok(None), + let conn = self.read_conn()?; + let result: rusqlite::Result = conn.query_row( + "SELECT number FROM blocks WHERE hash = ?", + params![hash.as_slice()], + |row| row.get(0), + ); + + match result { + Ok(n) => Ok(Some(n as u64)), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), } } fn get_block_transactions(&self, number: u64) -> ChainIndexResult> { - // New chunked layout. - let chunk_count_key = keys::block_txs_chunk_count_key(number); - if let Some(bytes) = self.storage.get(&chunk_count_key)? { - if bytes.len() != 4 { - return Err(ChainIndexError::Deserialization( - "invalid block tx chunk count format".to_string(), - )); - } - let mut count_bytes = [0u8; 4]; - count_bytes.copy_from_slice(&bytes); - let count = u32::from_be_bytes(count_bytes); - let mut hashes = Vec::new(); - for idx in 0..count { - let key = keys::block_txs_chunk_key(number, idx); - let chunk_bytes = self.storage.get(&key)?.ok_or_else(|| { - ChainIndexError::Deserialization(format!( - "missing tx chunk {} for block {}", - idx, number - )) - })?; - let mut chunk: Vec = serde_json::from_slice(&chunk_bytes)?; - hashes.append(&mut chunk); - } - return Ok(hashes); - } + let conn = self.read_conn()?; + let mut stmt = conn.prepare( + "SELECT hash FROM transactions WHERE block_number = ? ORDER BY transaction_index", + )?; + + let hashes: rusqlite::Result> = stmt + .query_map(params![number as i64], |row| { + let bytes: Vec = row.get(0)?; + b256_from_row(&bytes, 0) + })? + .collect(); - // Legacy single-value layout. - let key = keys::block_txs_key(number); - match self.storage.get(&key)? { - Some(bytes) => { - let hashes: Vec = serde_json::from_slice(&bytes)?; - Ok(hashes) - } - None => Ok(vec![]), - } + Ok(hashes?) } fn get_transaction(&self, hash: B256) -> ChainIndexResult> { - // Check cache first if let Some(tx) = self.cache.get_transaction(hash) { return Ok(Some((*tx).clone())); } - // Load from storage - let key = keys::tx_data_key(&hash); - match self.storage.get(&key)? { - Some(bytes) => { - let tx: StoredTransaction = serde_json::from_slice(&bytes)?; - // Populate cache + let conn = self.read_conn()?; + let result = conn.query_row( + "SELECT hash, block_number, block_hash, transaction_index, from_addr, to_addr, + value, gas, gas_price, input, nonce, v, r, s, tx_type, chain_id + FROM transactions WHERE hash = ?", + params![hash.as_slice()], + Self::row_to_stored_transaction, + ); + + match result { + Ok(tx) => { self.cache.insert_transaction(tx.clone()); Ok(Some(tx)) } - None => Ok(None), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), } } fn get_transaction_location(&self, hash: B256) -> ChainIndexResult> { - let key = keys::tx_location_key(&hash); - match self.storage.get(&key)? { - Some(bytes) => { - let loc: TxLocation = serde_json::from_slice(&bytes)?; - Ok(Some(loc)) - } - None => Ok(None), + let conn = self.read_conn()?; + let result: rusqlite::Result<(i64, i64)> = conn.query_row( + "SELECT block_number, transaction_index FROM transactions WHERE hash = ?", + params![hash.as_slice()], + |row| Ok((row.get(0)?, row.get(1)?)), + ); + + match result { + Ok((block_number, transaction_index)) => Ok(Some(TxLocation { + block_number: block_number as u64, + transaction_index: transaction_index as u32, + })), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), } } fn get_receipt(&self, hash: B256) -> ChainIndexResult> { - // Check cache first if let Some(receipt) = self.cache.get_receipt(hash) { return Ok(Some((*receipt).clone())); } - // Load from storage - let key = keys::tx_receipt_key(&hash); - match self.storage.get(&key)? { - Some(bytes) => { - let receipt: StoredReceipt = serde_json::from_slice(&bytes)?; - // Populate cache + let conn = self.read_conn()?; + let result = conn.query_row( + "SELECT transaction_hash, transaction_index, block_hash, block_number, + from_addr, to_addr, cumulative_gas_used, gas_used, contract_address, + status, tx_type + FROM receipts WHERE transaction_hash = ?", + params![hash.as_slice()], + Self::row_to_stored_receipt, + ); + + match result { + Ok(mut receipt) => { + // Load logs for this receipt + let logs = load_logs_for_tx(&conn, hash)?; + receipt.logs = logs; self.cache.insert_receipt(receipt.clone()); Ok(Some(receipt)) } - None => Ok(None), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), } } fn get_logs_by_block(&self, number: u64) -> ChainIndexResult> { - // New chunked layout. - let chunk_count_key = keys::logs_by_block_chunk_count_key(number); - if let Some(bytes) = self.storage.get(&chunk_count_key)? { - if bytes.len() != 4 { - return Err(ChainIndexError::Deserialization( - "invalid block logs chunk count format".to_string(), - )); - } - let mut count_bytes = [0u8; 4]; - count_bytes.copy_from_slice(&bytes); - let count = u32::from_be_bytes(count_bytes); - let mut logs = Vec::new(); - for idx in 0..count { - let key = keys::logs_by_block_chunk_key(number, idx); - let chunk_bytes = self.storage.get(&key)?.ok_or_else(|| { - ChainIndexError::Deserialization(format!( - "missing log chunk {} for block {}", - idx, number - )) - })?; - let mut chunk: Vec = serde_json::from_slice(&chunk_bytes)?; - logs.append(&mut chunk); - } - return Ok(logs); - } + let conn = self.read_conn()?; + let mut stmt = conn + .prepare("SELECT address, topics, data FROM logs WHERE block_number = ? ORDER BY id")?; + + let logs: rusqlite::Result> = stmt + .query_map(params![number as i64], |row| { + let address_bytes: Vec = row.get(0)?; + let topics_json: Vec = row.get(1)?; + let data_bytes: Vec = row.get(2)?; + Ok((address_bytes, topics_json, data_bytes)) + })? + .map(|r| { + let (address_bytes, topics_json, data_bytes) = r?; + parse_stored_log(&address_bytes, &topics_json, &data_bytes) + }) + .collect(); - // Legacy single-value layout. - let key = keys::logs_by_block_key(number); - match self.storage.get(&key)? { - Some(bytes) => { - let logs: Vec = serde_json::from_slice(&bytes)?; - Ok(logs) - } - None => Ok(vec![]), - } + Ok(logs?) } fn store_block( @@ -409,110 +542,67 @@ impl ChainIndex for PersistentChainIndex { let block_number = block.number; let block_hash = block.hash; - // Collect all logs from receipts - let all_logs: Vec = receipts.iter().flat_map(|r| r.logs.clone()).collect(); - - // Build batch operations - let mut ops = Vec::new(); - - // Store block header - ops.push(evolve_storage::Operation::Set { - key: keys::block_header_key(block_number), - value: serde_json::to_vec(&block)?, - }); - - // Store block number by hash - ops.push(evolve_storage::Operation::Set { - key: keys::block_number_key(&block_hash), - value: block_number.to_be_bytes().to_vec(), - }); - - // Store transaction hashes for this block in chunked format to avoid - // oversize values for high-tx blocks. - let tx_hashes: Vec = transactions.iter().map(|tx| tx.hash).collect(); - let tx_hash_chunks: Vec> = tx_hashes - .chunks(TX_HASH_CHUNK_SIZE) - .map(|c| c.to_vec()) - .collect(); - ops.push(evolve_storage::Operation::Set { - key: keys::block_txs_chunk_count_key(block_number), - value: (tx_hash_chunks.len() as u32).to_be_bytes().to_vec(), - }); - for (idx, chunk) in tx_hash_chunks.iter().enumerate() { - ops.push(evolve_storage::Operation::Set { - key: keys::block_txs_chunk_key(block_number, idx as u32), - value: serde_json::to_vec(chunk)?, - }); + let mut conn = self.writer.lock().unwrap(); + let tx = conn.transaction()?; + + // Delete existing data for this block number (handles re-indexing the same block) + tx.execute( + "DELETE FROM logs WHERE block_number = ?", + params![block_number as i64], + )?; + tx.execute( + "DELETE FROM receipts WHERE block_number = ?", + params![block_number as i64], + )?; + tx.execute( + "DELETE FROM transactions WHERE block_number = ?", + params![block_number as i64], + )?; + + // Insert block + tx.execute( + "INSERT OR REPLACE INTO blocks + (number, hash, parent_hash, state_root, transactions_root, receipts_root, + timestamp, gas_used, gas_limit, transaction_count, miner, extra_data) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + params![ + block.number as i64, + block.hash.as_slice(), + block.parent_hash.as_slice(), + block.state_root.as_slice(), + block.transactions_root.as_slice(), + block.receipts_root.as_slice(), + block.timestamp as i64, + block.gas_used as i64, + block.gas_limit as i64, + block.transaction_count as i64, + block.miner.as_slice(), + block.extra_data.as_ref(), + ], + )?; + + // Insert transactions using array index as the authoritative transaction_index + for (idx, transaction) in transactions.iter().enumerate() { + insert_transaction(&tx, transaction, idx as i64)?; } - // Store each transaction - for (idx, tx) in transactions.iter().enumerate() { - ops.push(evolve_storage::Operation::Set { - key: keys::tx_data_key(&tx.hash), - value: serde_json::to_vec(tx)?, - }); - - ops.push(evolve_storage::Operation::Set { - key: keys::tx_location_key(&tx.hash), - value: serde_json::to_vec(&TxLocation { - block_number, - transaction_index: idx as u32, - })?, - }); + // Insert receipts and logs + for (idx, receipt) in receipts.iter().enumerate() { + insert_receipt(&tx, receipt, idx as i64)?; + for log in &receipt.logs { + insert_log(&tx, block_number, receipt.transaction_hash, log)?; + } } - // Store each receipt - for receipt in &receipts { - ops.push(evolve_storage::Operation::Set { - key: keys::tx_receipt_key(&receipt.transaction_hash), - value: serde_json::to_vec(receipt)?, - }); - } + // Update latest block number metadata + tx.execute( + "INSERT OR REPLACE INTO metadata (key, value) VALUES ('latest_block', ?)", + params![block_number as i64], + )?; - // Store logs by block in chunked format to avoid oversize values. - if !all_logs.is_empty() { - let log_chunks: Vec> = all_logs - .chunks(LOG_CHUNK_SIZE) - .map(|c| c.to_vec()) - .collect(); - ops.push(evolve_storage::Operation::Set { - key: keys::logs_by_block_chunk_count_key(block_number), - value: (log_chunks.len() as u32).to_be_bytes().to_vec(), - }); - for (idx, chunk) in log_chunks.iter().enumerate() { - ops.push(evolve_storage::Operation::Set { - key: keys::logs_by_block_chunk_key(block_number, idx as u32), - value: serde_json::to_vec(chunk)?, - }); - } - } else { - ops.push(evolve_storage::Operation::Set { - key: keys::logs_by_block_chunk_count_key(block_number), - value: 0u32.to_be_bytes().to_vec(), - }); - } + tx.commit()?; - // Update latest block number - ops.push(evolve_storage::Operation::Set { - key: keys::META_LATEST.to_vec(), - value: block_number.to_be_bytes().to_vec(), - }); - - // Batch write and commit using block_on for the async operations - // This is acceptable because writes are infrequent and not latency-sensitive - futures::executor::block_on(async { - self.storage - .batch(ops) - .await - .map_err(|e| ChainIndexError::Storage(format!("batch write failed: {:?}", e)))?; - self.storage - .commit() - .await - .map_err(|e| ChainIndexError::Storage(format!("commit failed: {:?}", e)))?; - Ok::<_, ChainIndexError>(()) - })?; - - // Update cache + // Update cache after successful commit self.cache .insert_block_with_data(block, transactions, receipts); @@ -522,61 +612,164 @@ impl ChainIndex for PersistentChainIndex { } } -#[cfg(test)] -#[allow(clippy::disallowed_types)] -mod tests { - use super::*; - use alloy_primitives::{Address, Bytes, U256}; - use async_trait::async_trait; - use evolve_core::ErrorCode; - use evolve_storage::{CommitHash, Operation}; - use std::collections::HashMap; - use std::sync::RwLock; +fn insert_transaction( + tx: &rusqlite::Transaction<'_>, + transaction: &StoredTransaction, + array_index: i64, +) -> ChainIndexResult<()> { + tx.execute( + "INSERT OR REPLACE INTO transactions + (hash, block_number, block_hash, transaction_index, from_addr, to_addr, + value, gas, gas_price, input, nonce, v, r, s, tx_type, chain_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + params![ + transaction.hash.as_slice(), + transaction.block_number as i64, + transaction.block_hash.as_slice(), + array_index, + transaction.from.as_slice(), + transaction.to.as_ref().map(|a| a.as_slice()), + &u256_to_be_bytes(transaction.value) as &[u8], + transaction.gas as i64, + &u256_to_be_bytes(transaction.gas_price) as &[u8], + transaction.input.as_ref(), + transaction.nonce as i64, + transaction.v as i64, + &u256_to_be_bytes(transaction.r) as &[u8], + &u256_to_be_bytes(transaction.s) as &[u8], + transaction.tx_type as i64, + transaction.chain_id.map(|c| c as i64), + ], + )?; + Ok(()) +} - /// A mock storage implementation for testing. - pub struct MockStorage { - data: RwLock, Vec>>, - commit_count: RwLock, - } +fn insert_receipt( + tx: &rusqlite::Transaction<'_>, + receipt: &StoredReceipt, + array_index: i64, +) -> ChainIndexResult<()> { + tx.execute( + "INSERT OR REPLACE INTO receipts + (transaction_hash, transaction_index, block_hash, block_number, from_addr, to_addr, + cumulative_gas_used, gas_used, contract_address, status, tx_type) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + params![ + receipt.transaction_hash.as_slice(), + array_index, + receipt.block_hash.as_slice(), + receipt.block_number as i64, + receipt.from.as_slice(), + receipt.to.as_ref().map(|a| a.as_slice()), + receipt.cumulative_gas_used as i64, + receipt.gas_used as i64, + receipt.contract_address.as_ref().map(|a| a.as_slice()), + receipt.status as i64, + receipt.tx_type as i64, + ], + )?; + Ok(()) +} - impl MockStorage { - pub fn new() -> Self { - Self { - data: RwLock::new(HashMap::new()), - commit_count: RwLock::new(0), - } - } +fn insert_log( + tx: &rusqlite::Transaction<'_>, + block_number: u64, + tx_hash: B256, + log: &StoredLog, +) -> ChainIndexResult<()> { + let topics_json = serde_json::to_vec(&log.topics) + .map_err(|e| ChainIndexError::Serialization(e.to_string()))?; + + tx.execute( + "INSERT INTO logs (block_number, transaction_hash, address, topics, data) + VALUES (?, ?, ?, ?, ?)", + params![ + block_number as i64, + tx_hash.as_slice(), + log.address.as_slice(), + topics_json.as_slice(), + log.data.as_ref(), + ], + )?; + Ok(()) +} + +fn load_logs_for_tx(conn: &Connection, tx_hash: B256) -> ChainIndexResult> { + let mut stmt = conn + .prepare("SELECT address, topics, data FROM logs WHERE transaction_hash = ? ORDER BY id")?; + + let logs: rusqlite::Result> = stmt + .query_map(params![tx_hash.as_slice()], |row| { + let address_bytes: Vec = row.get(0)?; + let topics_json: Vec = row.get(1)?; + let data_bytes: Vec = row.get(2)?; + Ok((address_bytes, topics_json, data_bytes)) + })? + .map(|r| { + let (address_bytes, topics_json, data_bytes) = r?; + parse_stored_log(&address_bytes, &topics_json, &data_bytes) + }) + .collect(); + + Ok(logs?) +} + +fn parse_stored_log( + address_bytes: &[u8], + topics_json: &[u8], + data_bytes: &[u8], +) -> rusqlite::Result { + use alloy_primitives::Bytes; + + let topics: Vec = serde_json::from_slice(topics_json).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(1, rusqlite::types::Type::Blob, Box::new(e)) + })?; + + Ok(StoredLog { + address: address_from_row(address_bytes, 0)?, + topics, + data: Bytes::from(data_bytes.to_vec()), + }) +} + +/// Generate a unique ID for in-memory shared-cache SQLite databases. +fn unique_id() -> u64 { + use std::sync::atomic::{AtomicU64, Ordering}; + static COUNTER: AtomicU64 = AtomicU64::new(0); + COUNTER.fetch_add(1, Ordering::Relaxed) +} + +fn b256_from_row(bytes: &[u8], col: usize) -> rusqlite::Result { + if bytes.len() != 32 { + return Err(rusqlite::Error::FromSqlConversionFailure( + col, + rusqlite::types::Type::Blob, + format!("expected 32 bytes for B256, got {}", bytes.len()).into(), + )); } + Ok(B256::from_slice(bytes)) +} - impl evolve_core::ReadonlyKV for MockStorage { - fn get(&self, key: &[u8]) -> Result>, ErrorCode> { - Ok(self.data.read().unwrap().get(key).cloned()) - } +fn address_from_row(bytes: &[u8], col: usize) -> rusqlite::Result { + if bytes.len() != 20 { + return Err(rusqlite::Error::FromSqlConversionFailure( + col, + rusqlite::types::Type::Blob, + format!("expected 20 bytes for Address, got {}", bytes.len()).into(), + )); } + Ok(alloy_primitives::Address::from_slice(bytes)) +} - #[async_trait(?Send)] - impl evolve_storage::Storage for MockStorage { - async fn commit(&self) -> Result { - let mut count = self.commit_count.write().unwrap(); - *count += 1; - Ok(CommitHash::new([0u8; 32])) - } +fn u256_to_be_bytes(value: alloy_primitives::U256) -> [u8; 32] { + value.to_be_bytes() +} - async fn batch(&self, operations: Vec) -> Result<(), ErrorCode> { - let mut data = self.data.write().unwrap(); - for op in operations { - match op { - Operation::Set { key, value } => { - data.insert(key, value); - } - Operation::Remove { key } => { - data.remove(&key); - } - } - } - Ok(()) - } - } +#[cfg(test)] +#[allow(clippy::disallowed_types)] +mod tests { + use super::*; + use alloy_primitives::{Address, Bytes, U256}; /// Helper to create a test StoredBlock. pub fn make_test_stored_block(number: u64) -> StoredBlock { @@ -652,16 +845,11 @@ mod tests { } // ==================== Complex behavior tests ==================== - // Note: Basic store/get operations are covered by model-based tests in model_tests module. - // These tests focus on complex behaviors not easily covered by model tests. /// Tests that transaction location uses array index, not the field value. - /// This is important because TxLocation.transaction_index should reflect - /// the actual position in the block, not what was passed in StoredTransaction. #[test] fn test_transaction_location_uses_array_index() { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); + let index = PersistentChainIndex::in_memory().unwrap(); let block = make_test_stored_block(100); // Create tx with transaction_index=5, but it will be at array position 0 @@ -679,8 +867,7 @@ mod tests { /// Tests that transaction ordering is preserved when storing multiple txs. #[test] fn test_transaction_ordering_preserved() { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); + let index = PersistentChainIndex::in_memory().unwrap(); let block = make_test_stored_block(100); let hashes: Vec<_> = (0..5).map(|i| B256::repeat_byte(0x50 + i)).collect(); @@ -705,12 +892,10 @@ mod tests { /// Tests log aggregation from multiple receipts. #[test] fn test_logs_aggregated_from_receipts() { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); + let index = PersistentChainIndex::in_memory().unwrap(); let block = make_test_stored_block(100); - // Two transactions, each with logs let tx1_hash = B256::repeat_byte(0x60); let tx2_hash = B256::repeat_byte(0x61); @@ -750,8 +935,7 @@ mod tests { /// Tests cache population after store_block. #[test] fn test_cache_populated_on_store() { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); + let index = PersistentChainIndex::in_memory().unwrap(); let block = make_test_stored_block(100); let block_hash = block.hash; @@ -772,180 +956,25 @@ mod tests { /// Tests persistence across index instances (simulating restart). #[test] fn test_persistence_across_restart() { - let storage = Arc::new(MockStorage::new()); + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("chain-index.sqlite"); // First instance stores data - let index1 = PersistentChainIndex::new(Arc::clone(&storage)); - for i in 0..5 { - let block = make_test_stored_block(i); - index1.store_block(block, vec![], vec![]).unwrap(); + { + let index1 = PersistentChainIndex::new(&db_path).unwrap(); + for i in 0..5 { + let block = make_test_stored_block(i); + index1.store_block(block, vec![], vec![]).unwrap(); + } } // Second instance (simulating restart) should recover state - let index2 = PersistentChainIndex::new(storage); + let index2 = PersistentChainIndex::new(&db_path).unwrap(); index2.initialize().unwrap(); assert_eq!(index2.latest_block_number().unwrap(), Some(4)); assert!(index2.get_block(3).unwrap().is_some()); } - - /// Tests chunked tx hash roundtrip with more than TX_HASH_CHUNK_SIZE txs. - #[test] - fn test_chunked_txs_roundtrip() { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); - - let mut block = make_test_stored_block(1); - let tx_count = super::TX_HASH_CHUNK_SIZE + 10; // 42 txs -- spans 2 chunks - block.transaction_count = tx_count as u32; - - let txs: Vec = (0..tx_count) - .map(|i| { - let hash = B256::from_slice(&{ - let mut bytes = [0u8; 32]; - bytes[0..8].copy_from_slice(&(i as u64).to_be_bytes()); - bytes - }); - make_test_stored_transaction(hash, 1, block.hash, i as u32) - }) - .collect(); - let receipts: Vec = txs - .iter() - .enumerate() - .map(|(i, tx)| make_test_stored_receipt(tx.hash, 1, block.hash, i as u32, true)) - .collect(); - - let expected_hashes: Vec = txs.iter().map(|tx| tx.hash).collect(); - index.store_block(block, txs, receipts).unwrap(); - - let retrieved = index.get_block_transactions(1).unwrap(); - assert_eq!(retrieved.len(), tx_count); - assert_eq!(retrieved, expected_hashes); - } - - /// Tests chunk boundary conditions: exactly one full chunk, one-over, and zero txs. - #[test] - fn test_chunked_txs_boundary() { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); - - // Exactly TX_HASH_CHUNK_SIZE (one full chunk) - let block = make_test_stored_block(1); - let count = super::TX_HASH_CHUNK_SIZE; - let txs: Vec = (0..count) - .map(|i| { - let hash = B256::repeat_byte(i as u8); - make_test_stored_transaction(hash, 1, block.hash, i as u32) - }) - .collect(); - let receipts: Vec = txs - .iter() - .enumerate() - .map(|(i, tx)| make_test_stored_receipt(tx.hash, 1, block.hash, i as u32, true)) - .collect(); - index.store_block(block.clone(), txs, receipts).unwrap(); - assert_eq!(index.get_block_transactions(1).unwrap().len(), count); - - // TX_HASH_CHUNK_SIZE + 1 (boundary) - let block2 = make_test_stored_block(2); - let count2 = super::TX_HASH_CHUNK_SIZE + 1; - let txs2: Vec = (0..count2) - .map(|i| { - let mut bytes = [0u8; 32]; - bytes[0] = 0xBB; - bytes[1..9].copy_from_slice(&(i as u64).to_be_bytes()); - let hash = B256::from_slice(&bytes); - make_test_stored_transaction(hash, 2, block2.hash, i as u32) - }) - .collect(); - let receipts2: Vec = txs2 - .iter() - .enumerate() - .map(|(i, tx)| make_test_stored_receipt(tx.hash, 2, block2.hash, i as u32, true)) - .collect(); - index.store_block(block2, txs2, receipts2).unwrap(); - assert_eq!(index.get_block_transactions(2).unwrap().len(), count2); - - // 0 txs - let block3 = make_test_stored_block(3); - index.store_block(block3, vec![], vec![]).unwrap(); - assert_eq!(index.get_block_transactions(3).unwrap().len(), 0); - } - - /// Tests chunked log roundtrip with more than LOG_CHUNK_SIZE logs. - #[test] - fn test_chunked_logs_roundtrip() { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); - - let block = make_test_stored_block(1); - let tx_hash = B256::repeat_byte(0xAA); - let tx = make_test_stored_transaction(tx_hash, 1, block.hash, 0); - - let log_count = super::LOG_CHUNK_SIZE + 5; // 21 logs -- spans 2 chunks - let mut receipt = make_test_stored_receipt(tx_hash, 1, block.hash, 0, true); - receipt.logs = (0..log_count) - .map(|i| StoredLog { - address: Address::repeat_byte(i as u8), - topics: vec![B256::repeat_byte(i as u8)], - data: Bytes::from(vec![i as u8]), - }) - .collect(); - - index.store_block(block, vec![tx], vec![receipt]).unwrap(); - - let logs = index.get_logs_by_block(1).unwrap(); - assert_eq!(logs.len(), log_count); - // Verify ordering preserved - for (i, log) in logs.iter().enumerate() { - assert_eq!(log.address, Address::repeat_byte(i as u8)); - } - } - - /// Tests that a missing chunk key returns an error instead of silent data loss. - #[test] - fn test_missing_chunk_returns_error() { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(Arc::clone(&storage)); - - // Store a block with enough txs to create 2 chunks - let block = make_test_stored_block(1); - let count = super::TX_HASH_CHUNK_SIZE + 1; - let txs: Vec = (0..count) - .map(|i| { - let mut bytes = [0u8; 32]; - bytes[0..8].copy_from_slice(&(i as u64).to_be_bytes()); - let hash = B256::from_slice(&bytes); - make_test_stored_transaction(hash, 1, block.hash, i as u32) - }) - .collect(); - let receipts: Vec = txs - .iter() - .enumerate() - .map(|(i, tx)| make_test_stored_receipt(tx.hash, 1, block.hash, i as u32, true)) - .collect(); - index.store_block(block, txs, receipts).unwrap(); - - // Verify it works first - assert_eq!(index.get_block_transactions(1).unwrap().len(), count); - - // Delete the second chunk key from storage directly - let chunk_key = super::keys::block_txs_chunk_key(1, 1); - { - let mut data = storage.data.write().unwrap(); - data.remove(&chunk_key); - } - - // Now reading should return an error, not silently skip - let result = index.get_block_transactions(1); - assert!(result.is_err()); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("missing tx chunk"), - "Expected 'missing tx chunk' error, got: {}", - err_msg - ); - } } // ==================== Model-based tests ==================== @@ -953,7 +982,7 @@ mod tests { #[allow(clippy::disallowed_types)] mod model_tests { use super::tests::{ - make_test_stored_block, make_test_stored_receipt, make_test_stored_transaction, MockStorage, + make_test_stored_block, make_test_stored_receipt, make_test_stored_transaction, }; use super::*; use alloy_primitives::Address; @@ -961,8 +990,6 @@ mod model_tests { use std::collections::HashMap; /// A simple reference model for ChainIndex behavior. - /// This is a straightforward HashMap-based implementation that serves - /// as the "oracle" to verify the real implementation against. #[derive(Debug, Default, Clone)] #[allow(dead_code)] struct ChainIndexModel { @@ -990,11 +1017,9 @@ mod model_tests { let block_number = block.number; let block_hash = block.hash; - // Store block self.blocks_by_number.insert(block_number, block.clone()); self.blocks_by_hash.insert(block_hash, block); - // Store transactions and their locations for (idx, tx) in transactions.iter().enumerate() { self.transactions.insert(tx.hash, tx.clone()); self.tx_locations.insert( @@ -1006,7 +1031,6 @@ mod model_tests { ); } - // Store receipts and collect logs let mut all_logs = Vec::new(); for receipt in receipts { all_logs.extend(receipt.logs.clone()); @@ -1016,7 +1040,6 @@ mod model_tests { self.logs_by_block.insert(block_number, all_logs); } - // Update latest block self.latest_block = Some( self.latest_block .map(|l| l.max(block_number)) @@ -1060,32 +1083,13 @@ mod model_tests { /// Operations that can be performed on a ChainIndex. #[derive(Debug, Clone)] enum Operation { - StoreBlock { - block_number: u64, - tx_count: usize, - }, - GetBlock { - number: u64, - }, - GetBlockByHash { - /// Index into previously stored blocks (modulo stored count) - block_idx: usize, - }, - GetTransaction { - /// Index into previously stored transactions (modulo stored count) - tx_idx: usize, - }, - GetReceipt { - /// Index into previously stored transactions (modulo stored count) - tx_idx: usize, - }, - GetTransactionLocation { - /// Index into previously stored transactions (modulo stored count) - tx_idx: usize, - }, - GetLogsByBlock { - number: u64, - }, + StoreBlock { block_number: u64, tx_count: usize }, + GetBlock { number: u64 }, + GetBlockByHash { block_idx: usize }, + GetTransaction { tx_idx: usize }, + GetReceipt { tx_idx: usize }, + GetTransactionLocation { tx_idx: usize }, + GetLogsByBlock { number: u64 }, GetLatestBlockNumber, } @@ -1097,37 +1101,26 @@ mod model_tests { stored_tx_hashes: Vec, } - /// Strategy to generate a single operation. fn arb_operation(max_block: u64, _max_txs: usize) -> impl Strategy { prop_oneof![ - // Store block with 0-3 transactions (0..max_block, 0..=3usize).prop_map(|(block_number, tx_count)| Operation::StoreBlock { block_number, tx_count }), - // Get block by number (0..max_block).prop_map(|number| Operation::GetBlock { number }), - // Get block by hash (index into stored blocks) (0..10usize).prop_map(|block_idx| Operation::GetBlockByHash { block_idx }), - // Get transaction (0..10usize).prop_map(|tx_idx| Operation::GetTransaction { tx_idx }), - // Get receipt (0..10usize).prop_map(|tx_idx| Operation::GetReceipt { tx_idx }), - // Get transaction location (0..10usize).prop_map(|tx_idx| Operation::GetTransactionLocation { tx_idx }), - // Get logs by block (0..max_block).prop_map(|number| Operation::GetLogsByBlock { number }), - // Get latest block number Just(Operation::GetLatestBlockNumber), ] } - /// Strategy to generate a sequence of operations. fn arb_operations(count: usize) -> impl Strategy> { proptest::collection::vec(arb_operation(20, 5), 1..=count) } - /// Generate test data for a block with transactions. fn generate_block_data( block_number: u64, tx_count: usize, @@ -1151,7 +1144,6 @@ mod model_tests { let mut receipt = make_test_stored_receipt(tx_hash, block_number, block.hash, i as u32, true); - // Add a log to every other transaction if i % 2 == 0 { receipt.logs.push(crate::types::StoredLog { address: Address::repeat_byte((tx_id % 256) as u8), @@ -1171,8 +1163,7 @@ mod model_tests { /// Model-based test: verify that PersistentChainIndex behaves identically to the reference model. #[test] fn prop_chain_index_matches_model(operations in arb_operations(30)) { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); + let index = PersistentChainIndex::in_memory().unwrap(); let mut model = ChainIndexModel::new(); let mut state = OperationState::default(); let mut tx_counter = 0u64; @@ -1180,7 +1171,6 @@ mod model_tests { for op in operations { match op { Operation::StoreBlock { block_number, tx_count } => { - // Skip if block already stored if state.stored_block_numbers.contains(&block_number) { continue; } @@ -1188,14 +1178,12 @@ mod model_tests { let (block, txs, receipts) = generate_block_data(block_number, tx_count, &mut tx_counter); - // Track stored data state.stored_block_numbers.push(block_number); state.stored_block_hashes.push(block.hash); for tx in &txs { state.stored_tx_hashes.push(tx.hash); } - // Apply to both model.store_block(block.clone(), txs.clone(), receipts.clone()); index.store_block(block, txs, receipts).unwrap(); } @@ -1314,27 +1302,21 @@ mod model_tests { } /// Test that storing the same block number twice doesn't corrupt state. - /// The second store overwrites the first. #[test] fn prop_duplicate_block_storage(block_number in 0u64..100, tx_count1 in 0usize..3, tx_count2 in 0usize..3) { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); + let index = PersistentChainIndex::in_memory().unwrap(); let mut tx_counter = 0u64; - // Store first version let (block1, txs1, receipts1) = generate_block_data(block_number, tx_count1, &mut tx_counter); index.store_block(block1, txs1, receipts1).unwrap(); - // Store second version (different hash due to different timestamp in test helper) let (block2, txs2, receipts2) = generate_block_data(block_number, tx_count2, &mut tx_counter); let block2_hash = block2.hash; index.store_block(block2, txs2.clone(), receipts2).unwrap(); - // The block should reflect the second store let retrieved = index.get_block(block_number).unwrap().unwrap(); prop_assert_eq!(retrieved.hash, block2_hash); - // Transaction count should match second store let tx_hashes = index.get_block_transactions(block_number).unwrap(); prop_assert_eq!(tx_hashes.len(), txs2.len()); } @@ -1345,12 +1327,10 @@ mod model_tests { block_number in 1000u64..2000, tx_hash_bytes in proptest::collection::vec(any::(), 32) ) { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); + let index = PersistentChainIndex::in_memory().unwrap(); let tx_hash = B256::from_slice(&tx_hash_bytes); - // All queries for non-existent data should return None/empty prop_assert!(index.get_block(block_number).unwrap().is_none()); prop_assert!(index.get_block_by_hash(tx_hash).unwrap().is_none()); prop_assert!(index.get_transaction(tx_hash).unwrap().is_none()); @@ -1362,13 +1342,11 @@ mod model_tests { /// Test that block number lookups are consistent with block storage. #[test] fn prop_block_number_lookup_consistent(blocks in proptest::collection::vec(0u64..50, 1..10)) { - let storage = Arc::new(MockStorage::new()); - let index = PersistentChainIndex::new(storage); + let index = PersistentChainIndex::in_memory().unwrap(); let mut tx_counter = 0u64; let mut stored_blocks = Vec::new(); for &block_number in &blocks { - // Skip duplicates if stored_blocks.iter().any(|(n, _)| *n == block_number) { continue; } @@ -1379,17 +1357,13 @@ mod model_tests { stored_blocks.push((block_number, hash)); } - // Verify all lookups are consistent for (number, hash) in &stored_blocks { - // get_block_number(hash) should return the number let looked_up_number = index.get_block_number(*hash).unwrap(); prop_assert_eq!(looked_up_number, Some(*number)); - // get_block(number).hash should equal the stored hash let block = index.get_block(*number).unwrap().unwrap(); prop_assert_eq!(block.hash, *hash); - // get_block_by_hash(hash).number should equal the stored number let block_by_hash = index.get_block_by_hash(*hash).unwrap().unwrap(); prop_assert_eq!(block_by_hash.number, *number); }