Skip to content
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ itertools = "0.14.0"
metrics = "0.24.2"
openssl = { version = "0.10", features = ["vendored"] }
reqwest = "0.12.9"
schnellru = "0.2"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.137"
tracing = "0.1.41"
Expand Down
11 changes: 3 additions & 8 deletions crates/blobber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ alloy.workspace = true

init4-bin-base.workspace = true
signet-extract.workspace = true
signet-types.workspace = true
signet-zenith.workspace = true

reth.workspace = true
reth-chainspec.workspace = true
reth-transaction-pool = { workspace = true, optional = true }

futures-util.workspace = true
schnellru.workspace = true
serde.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand All @@ -31,11 +28,9 @@ thiserror.workspace = true
[dev-dependencies]
signet-constants = { workspace = true, features = ["test-utils"] }

reth-transaction-pool = { workspace = true, features = ["test-utils"] }

eyre.workspace = true
serde_json.workspace = true
tempfile.workspace = true

[features]
test-utils = ["signet-constants/test-utils", "dep:reth-transaction-pool", "reth-transaction-pool?/test-utils"]
test-utils = ["signet-constants/test-utils"]
178 changes: 65 additions & 113 deletions crates/blobber/src/blobs/builder.rs
Original file line number Diff line number Diff line change
@@ -1,140 +1,92 @@
use crate::{BlobCacher, BlobFetcher, BlobFetcherConfig};
use reth::transaction_pool::TransactionPool;
use url::Url;
use crate::{
AsyncBlobSource, BlobCacher, BlobFetcher, BlobFetcherConfig, BlobSource,
sources::{BeaconBlobSource, BlobExplorerSource, PylonBlobSource},
};

/// Errors that can occur while building the [`BlobFetcher`] with a
/// [`BlobFetcherBuilder`].
#[derive(Debug, thiserror::Error)]
#[derive(Debug, Clone, Copy, thiserror::Error)]
pub enum BuilderError {
/// The transaction pool was not provided.
#[error("transaction pool is required")]
MissingPool,
/// The explorer URL was not provided or could not be parsed.
#[error("explorer URL is required and must be valid")]
MissingExplorerUrl,
/// The URL provided was invalid.
#[error("invalid URL provided")]
Url(#[from] url::ParseError),
/// The client was not provided.
#[error("client is required")]
MissingClient,
/// The client failed to build.
#[error("failed to build client: {0}")]
Client(#[from] reqwest::Error),
/// The slot calculator was not provided.
#[error("slot calculator is required")]
MissingSlotCalculator,
}

/// Builder for the [`BlobFetcher`].
#[derive(Debug, Default, Clone)]
pub struct BlobFetcherBuilder<Pool> {
pool: Option<Pool>,
explorer_url: Option<String>,
client: Option<reqwest::Client>,
cl_url: Option<String>,
pylon_url: Option<String>,
///
/// Add synchronous and asynchronous blob sources, then call [`build`] to
/// produce a [`BlobFetcher`] or [`build_cache`] for a [`BlobCacher`].
///
/// [`build`]: BlobFetcherBuilder::build
/// [`build_cache`]: BlobFetcherBuilder::build_cache
#[derive(Default)]
pub struct BlobFetcherBuilder {
sync_sources: Vec<Box<dyn BlobSource>>,
async_sources: Vec<Box<dyn AsyncBlobSource>>,
}

impl<Pool> BlobFetcherBuilder<Pool> {
/// Set the transaction pool to use for the extractor.
pub fn with_pool<P2>(self, pool: P2) -> BlobFetcherBuilder<P2> {
BlobFetcherBuilder {
pool: Some(pool),
explorer_url: self.explorer_url,
client: self.client,
cl_url: self.cl_url,
pylon_url: self.pylon_url,
}
}

/// Set the transaction pool to use a mock test pool.
#[cfg(feature = "test-utils")]
pub fn with_test_pool(self) -> BlobFetcherBuilder<reth_transaction_pool::test_utils::TestPool> {
self.with_pool(reth_transaction_pool::test_utils::testing_pool())
}

/// Set the configuration for the CL url, pylon url, from the provided
/// [`BlobFetcherConfig`].
pub fn with_config(self, config: &BlobFetcherConfig) -> Result<Self, BuilderError> {
let this = self.with_explorer_url(config.blob_explorer_url());
let this =
if let Some(cl_url) = config.cl_url() { this.with_cl_url(cl_url)? } else { this };

if let Some(pylon_url) = config.pylon_url() {
this.with_pylon_url(pylon_url)
} else {
Ok(this)
}
impl core::fmt::Debug for BlobFetcherBuilder {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("BlobFetcherBuilder")
.field("sync_sources", &self.sync_sources.len())
.field("async_sources", &self.async_sources.len())
.finish()
}
}

/// Set the blob explorer URL to use for the extractor. This will be used
/// to construct a [`foundry_blob_explorers::Client`].
pub fn with_explorer_url(mut self, explorer_url: &str) -> Self {
self.explorer_url = Some(explorer_url.to_string());
impl BlobFetcherBuilder {
/// Adds a synchronous blob source.
pub fn with_source(mut self, source: impl BlobSource + 'static) -> Self {
self.sync_sources.push(Box::new(source));
self
}

/// Set the [`reqwest::Client`] to use for the extractor. This client will
/// be used to make requests to the blob explorer, and the CL and Pylon URLs
/// if provided.
pub fn with_client(mut self, client: reqwest::Client) -> Self {
self.client = Some(client);
/// Adds an asynchronous blob source.
pub fn with_async_source(mut self, source: impl AsyncBlobSource + 'static) -> Self {
self.async_sources.push(Box::new(source));
self
}

/// Set the [`reqwest::Client`] via a [reqwest::ClientBuilder]. This
/// function will immediately build the client and return an error if it
/// fails.
/// Configures standard remote sources from a [`BlobFetcherConfig`].
///
/// This client will be used to make requests to the blob explorer, and the
/// CL and Pylon URLs if provided.
pub fn with_client_builder(self, client: reqwest::ClientBuilder) -> Result<Self, BuilderError> {
client.build().map(|client| self.with_client(client)).map_err(Into::into)
}

/// Set the CL URL to use for the extractor.
pub fn with_cl_url(mut self, cl_url: &str) -> Result<Self, BuilderError> {
self.cl_url = Some(cl_url.to_string());
Ok(self)
}

/// Set the Pylon URL to use for the extractor.
pub fn with_pylon_url(mut self, pylon_url: &str) -> Result<Self, BuilderError> {
self.pylon_url = Some(pylon_url.to_string());
Ok(self)
/// This constructs a [`BlobExplorerSource`], and optionally a
/// [`BeaconBlobSource`] and [`PylonBlobSource`] depending on whether
/// the config provides CL and Pylon URLs.
pub fn with_config(
self,
config: &BlobFetcherConfig,
client: reqwest::Client,
) -> Result<Self, BuilderError> {
let explorer = foundry_blob_explorers::Client::new_with_client(
config.blob_explorer_url(),
client.clone(),
);
let this = self.with_async_source(BlobExplorerSource::new(explorer));

let this = match config.cl_url() {
Some(cl) => {
let url = url::Url::parse(cl)?;
this.with_async_source(BeaconBlobSource::new(client.clone(), url))
}
None => this,
};

match config.pylon_url() {
Some(pylon) => {
let url = url::Url::parse(pylon)?;
Ok(this.with_async_source(PylonBlobSource::new(client, url)))
}
None => Ok(this),
}
}
}

impl<Pool: TransactionPool> BlobFetcherBuilder<Pool> {
/// Build the [`BlobFetcher`] with the provided parameters.
pub fn build(self) -> Result<BlobFetcher<Pool>, BuilderError> {
let pool = self.pool.ok_or(BuilderError::MissingPool)?;

let explorer_url = self.explorer_url.ok_or(BuilderError::MissingExplorerUrl)?;

let cl_url = self.cl_url.map(parse_url).transpose()?;

let pylon_url = self.pylon_url.map(parse_url).transpose()?;

let client = self.client.ok_or(BuilderError::MissingClient)?;

let explorer =
foundry_blob_explorers::Client::new_with_client(explorer_url, client.clone());

Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url))
/// Build the [`BlobFetcher`].
pub fn build(self) -> BlobFetcher {
BlobFetcher::new(self.sync_sources, self.async_sources)
}

/// Build a [`BlobCacher`] with the provided parameters.
pub fn build_cache(self) -> Result<BlobCacher<Pool>, BuilderError>
where
Pool: 'static,
{
let fetcher = self.build()?;
Ok(BlobCacher::new(fetcher))
/// Build a [`BlobCacher`] wrapping the constructed [`BlobFetcher`].
pub fn build_cache(self) -> BlobCacher {
BlobCacher::new(self.build())
}
}

fn parse_url(url: String) -> Result<Url, BuilderError> {
Url::parse(url.as_ref()).map_err(BuilderError::Url)
}
108 changes: 12 additions & 96 deletions crates/blobber/src/blobs/cache.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::{BlobFetcher, BlobberError, BlobberResult, Blobs, FetchResult};
use crate::{BlobFetcher, BlobSpec, BlobberError, BlobberResult, Blobs, FetchResult};
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA;
use alloy::eips::merge::EPOCH_SLOTS;
use alloy::primitives::{B256, Bytes, keccak256};
use core::fmt;
use reth::{network::cache::LruMap, transaction_pool::TransactionPool};
use schnellru::{ByLength, LruMap};
use signet_extract::ExtractedEvent;
use signet_zenith::Zenith::BlockSubmitted;
use signet_zenith::ZenithBlock;
Expand Down Expand Up @@ -144,22 +144,22 @@ impl<Coder> CacheHandle<Coder> {
}

/// Retrieves blobs and stores them in a cache for later use.
pub struct BlobCacher<Pool> {
fetcher: BlobFetcher<Pool>,
pub struct BlobCacher {
fetcher: BlobFetcher,

cache: Mutex<LruMap<(usize, B256), Blobs>>,
}

impl<Pool: fmt::Debug> fmt::Debug for BlobCacher<Pool> {
impl fmt::Debug for BlobCacher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive()
}
}

impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
/// Creates a new `BlobCacher` with the provided extractor and cache size.
pub fn new(fetcher: BlobFetcher<Pool>) -> Self {
Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() }
impl BlobCacher {
/// Creates a new `BlobCacher` with the provided fetcher and cache size.
pub fn new(fetcher: BlobFetcher) -> Self {
Self { fetcher, cache: LruMap::new(ByLength::new(BLOB_CACHE_SIZE)).into() }
}

/// Fetches blobs for a given slot and transaction hash.
Expand All @@ -176,12 +176,14 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
return Ok(blobs.clone());
}

let spec = BlobSpec { tx_hash, slot, versioned_hashes };

// Cache miss, use the fetcher to retrieve blobs
// Retry fetching blobs up to `FETCH_RETRIES` times
for attempt in 1..=FETCH_RETRIES {
let Ok(blobs) = self
.fetcher
.fetch_blobs(slot, tx_hash, &versioned_hashes)
.fetch_blobs(&spec)
.instrument(debug_span!("fetch_blobs_loop", attempt))
.await
else {
Expand Down Expand Up @@ -229,89 +231,3 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
CacheHandle { sender, _coder: PhantomData }
}
}

#[cfg(test)]
mod tests {
use crate::BlobFetcher;

use super::*;
use alloy::{
consensus::{SidecarBuilder, SignableTransaction as _, TxEip2930},
eips::Encodable2718,
primitives::{TxKind, U256, bytes},
rlp::encode,
signers::{SignerSync, local::PrivateKeySigner},
};
use reth::primitives::Transaction;
use reth_transaction_pool::{
PoolTransaction, TransactionOrigin,
test_utils::{MockTransaction, testing_pool},
};
use signet_types::{constants::SignetSystemConstants, primitives::TransactionSigned};

#[tokio::test]
async fn test_fetch_from_pool() -> eyre::Result<()> {
let wallet = PrivateKeySigner::random();
let pool = testing_pool();

let test = signet_constants::KnownChains::Test;

let constants: SignetSystemConstants = test.try_into().unwrap();

let explorer_url = "https://api.holesky.blobscan.com/";
let client = reqwest::Client::builder().use_rustls_tls();

let tx = Transaction::Eip2930(TxEip2930 {
chain_id: 17001,
nonce: 2,
gas_limit: 50000,
gas_price: 1_500_000_000,
to: TxKind::Call(constants.host_zenith()),
value: U256::from(1_f64),
input: bytes!(""),
..Default::default()
});

let encoded_transactions =
encode(vec![sign_tx_with_key_pair(wallet.clone(), tx).encoded_2718()]);

let result = SidecarBuilder::<SimpleCoder>::from_slice(&encoded_transactions).build_4844();
assert!(result.is_ok());

let mut mock_transaction = MockTransaction::eip4844_with_sidecar(result.unwrap().into());
let transaction =
sign_tx_with_key_pair(wallet, Transaction::from(mock_transaction.clone()));

mock_transaction.set_hash(*transaction.hash());

pool.add_transaction(TransactionOrigin::Local, mock_transaction.clone()).await?;

// Spawn the cache
let cache = BlobFetcher::builder()
.with_pool(pool.clone())
.with_explorer_url(explorer_url)
.with_client_builder(client)
.unwrap()
.build_cache()?;
let handle = cache.spawn::<SimpleCoder>();

let got = handle
.fetch_blobs(
0, // this is ignored by the pool
*mock_transaction.hash(),
mock_transaction.blob_versioned_hashes().unwrap().to_owned(),
)
.await;
assert!(got.is_ok());

let got_blobs = got.unwrap();
assert!(got_blobs.len() == 1);

Ok(())
}

fn sign_tx_with_key_pair(wallet: PrivateKeySigner, tx: Transaction) -> TransactionSigned {
let signature = wallet.sign_hash_sync(&tx.signature_hash()).unwrap();
TransactionSigned::new_unhashed(tx, signature)
}
}
Loading