Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ alloy-primitives = "0.8"
alloy-consensus = "0.9"
alloy-rlp = "0.3"
alloy-eips = "0.9"
rusqlite = { version = "0.38", features = ["bundled"] }
[workspace.lints.rust]
# missing_docs = "deny"

Expand Down
52 changes: 31 additions & 21 deletions bin/evd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,21 @@ fn run_node(config: NodeConfig, genesis_config: Option<EvdGenesisConfig>) {

// Create shared mempool
let mempool: SharedMempool<Mempool<TxContext>> = new_shared_mempool();
// Create chain index (shared between RPC and block callback)
let chain_index = Arc::new(PersistentChainIndex::new(Arc::new(storage.clone())));
if let Err(e) = chain_index.initialize() {
tracing::warn!("Failed to initialize chain index: {:?}", e);
}
// Create chain index backed by SQLite (only when needed)
let chain_index = if config.rpc.enabled || config.rpc.enable_block_indexing {
let chain_index_db_path =
std::path::PathBuf::from(&config.storage.path).join("chain-index.sqlite");
let index = Arc::new(
PersistentChainIndex::new(&chain_index_db_path)
.expect("failed to open chain index database"),
);
if let Err(e) = index.initialize() {
tracing::warn!("Failed to initialize chain index: {:?}", e);
}
Some(index)
} else {
None
};

// Set up JSON-RPC server if enabled
let rpc_handle = if config.rpc.enabled {
Expand All @@ -235,7 +245,7 @@ fn run_node(config: NodeConfig, genesis_config: Option<EvdGenesisConfig>) {
};

let state_provider = ChainStateProvider::with_mempool(
Arc::clone(&chain_index),
Arc::clone(chain_index.as_ref().expect("chain index required for RPC")),
state_provider_config,
codes_for_rpc,
mempool.clone(),
Expand Down Expand Up @@ -267,7 +277,7 @@ fn run_node(config: NodeConfig, genesis_config: Option<EvdGenesisConfig>) {

// Build the OnBlockExecuted callback: commits state to storage + indexes blocks
let storage_for_callback = storage.clone();
let chain_index_for_callback = Arc::clone(&chain_index);
let chain_index_for_callback = chain_index.clone();
let parent_hash_for_callback = Arc::clone(&parent_hash);
let current_height_for_callback = Arc::clone(&current_height);
let callback_chain_id = config.chain.chain_id;
Expand Down Expand Up @@ -315,20 +325,20 @@ fn run_node(config: NodeConfig, genesis_config: Option<EvdGenesisConfig>) {
let (stored_block, stored_txs, stored_receipts) =
build_index_data(&block, &info.block_result, &metadata);

if callback_indexing_enabled {
if let Err(e) = chain_index_for_callback.store_block(
stored_block,
stored_txs,
stored_receipts,
) {
tracing::warn!("Failed to index block {}: {:?}", info.height, e);
} else {
tracing::debug!(
"Indexed block {} (hash={}, state_root={})",
info.height,
block_hash,
state_root
);
if let Some(ref chain_index) = chain_index_for_callback {
if callback_indexing_enabled {
if let Err(e) =
chain_index.store_block(stored_block, stored_txs, stored_receipts)
{
tracing::warn!("Failed to index block {}: {:?}", info.height, e);
} else {
tracing::debug!(
"Indexed block {} (hash={}, state_root={})",
info.height,
block_hash,
state_root
);
}
}
}

Expand Down
23 changes: 15 additions & 8 deletions crates/app/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ pub fn run_dev_node_with_rpc<
path: data_dir.to_path_buf(),
..Default::default()
};
let chain_index_db_path = data_dir.join("chain-index.sqlite");

let runtime_config = TokioConfig::default()
.with_storage_directory(data_dir)
Expand All @@ -277,6 +278,7 @@ pub fn run_dev_node_with_rpc<
let run_genesis = Arc::clone(&run_genesis);
let build_storage = Arc::clone(&build_storage);
let rpc_config = rpc_config.clone();
let chain_index_db_path = chain_index_db_path.clone();

async move {
// Clone context early since build_storage takes ownership
Expand Down Expand Up @@ -323,10 +325,11 @@ pub fn run_dev_node_with_rpc<

// Set up RPC infrastructure if enabled
let rpc_handle = if rpc_config.enabled {
// Create chain index for RPC queries
// Clone storage since both DevConsensus and ChainIndex need access
let storage_for_index = storage.clone();
let chain_index = Arc::new(PersistentChainIndex::new(Arc::new(storage_for_index)));
// Create chain index backed by SQLite
let chain_index = Arc::new(
PersistentChainIndex::new(&chain_index_db_path)
.expect("failed to open chain index database"),
);

// Initialize from existing data
if let Err(e) = chain_index.initialize() {
Expand Down Expand Up @@ -366,7 +369,7 @@ pub fn run_dev_node_with_rpc<
.expect("failed to start RPC server");

// Create DevConsensus with RPC support
let dev: Arc<DevConsensus<Stf, S, Codes, Tx, PersistentChainIndex<S>>> = Arc::new(
let dev: Arc<DevConsensus<Stf, S, Codes, Tx, PersistentChainIndex>> = Arc::new(
DevConsensus::with_rpc(
stf,
storage,
Expand Down Expand Up @@ -683,6 +686,7 @@ pub fn run_dev_node_with_rpc_and_mempool_eth<
path: data_dir.to_path_buf(),
..Default::default()
};
let chain_index_db_path = data_dir.join("chain-index.sqlite");

let runtime_config = TokioConfig::default()
.with_storage_directory(data_dir)
Expand All @@ -703,6 +707,7 @@ pub fn run_dev_node_with_rpc_and_mempool_eth<
let run_genesis = Arc::clone(&run_genesis);
let build_storage = Arc::clone(&build_storage);
let rpc_config = rpc_config.clone();
let chain_index_db_path = chain_index_db_path.clone();

async move {
let context_for_shutdown = context.clone();
Expand Down Expand Up @@ -747,8 +752,10 @@ pub fn run_dev_node_with_rpc_and_mempool_eth<
let mempool: SharedMempool<Mempool<TxContext>> = new_shared_mempool();

let rpc_handle = if rpc_config.enabled {
let storage_for_index = storage.clone();
let chain_index = Arc::new(PersistentChainIndex::new(Arc::new(storage_for_index)));
let chain_index = Arc::new(
PersistentChainIndex::new(&chain_index_db_path)
.expect("failed to open chain index database"),
);

if let Err(e) = chain_index.initialize() {
tracing::warn!("Failed to initialize chain index: {:?}", e);
Expand Down Expand Up @@ -784,7 +791,7 @@ pub fn run_dev_node_with_rpc_and_mempool_eth<
.await
.expect("failed to start RPC server");

let dev: Arc<DevConsensus<Stf, S, Codes, TxContext, PersistentChainIndex<S>>> =
let dev: Arc<DevConsensus<Stf, S, Codes, TxContext, PersistentChainIndex>> =
Arc::new(
DevConsensus::with_rpc_and_mempool(
stf,
Expand Down
5 changes: 4 additions & 1 deletion crates/rpc/chain-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ archive = []
evolve_core.workspace = true
evolve_stf_traits.workspace = true
evolve_stf.workspace = true
evolve_storage.workspace = true
evolve_rpc_types.workspace = true
evolve_eth_jsonrpc.workspace = true
evolve_mempool.workspace = true
Expand All @@ -34,9 +33,13 @@ parking_lot = "0.12"
lru = "0.12"
sha2.workspace = true
futures = "0.3"
rusqlite.workspace = true
r2d2 = "0.8"
r2d2_sqlite = "0.32"

[dev-dependencies]
proptest = "1.4"
tempfile = "3"

[lints]
workspace = true
7 changes: 7 additions & 0 deletions crates/rpc/chain-index/proptest-regressions/index.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 0df5a6988f79f296c9ef1b47df11465b35b7c176c5e9e6785964bf3bf564067f # shrinks to block_number = 0, tx_count1 = 1, tx_count2 = 0
10 changes: 10 additions & 0 deletions crates/rpc/chain-index/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub enum ChainIndexError {
/// Index is empty (no blocks stored).
#[error("chain index is empty")]
EmptyIndex,

/// SQLite database error.
#[error("sqlite error: {0}")]
Sqlite(String),
}

impl From<serde_json::Error> for ChainIndexError {
Expand All @@ -58,5 +62,11 @@ impl From<evolve_core::ErrorCode> for ChainIndexError {
}
}

impl From<rusqlite::Error> for ChainIndexError {
fn from(err: rusqlite::Error) -> Self {
ChainIndexError::Sqlite(err.to_string())
}
}

/// Result type for chain indexing operations.
pub type ChainIndexResult<T> = Result<T, ChainIndexError>;
Loading
Loading