Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 95 additions & 5 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use alloy::providers::Provider;
use futures_util::future::try_join_all;
use init4_bin_base::{
deps::metrics::{counter, histogram},
perms::tx_cache::{BuilderTxCache, BuilderTxCacheError},
Expand All @@ -8,12 +10,13 @@ use signet_tx_cache::{
TxCacheError,
types::{BundleKey, CachedBundle},
};
use std::collections::{BTreeMap, BTreeSet};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, trace, trace_span, warn};
use tracing::{Instrument, debug_span, trace, trace_span, warn};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand Down Expand Up @@ -87,6 +90,96 @@ impl BundlePoller {
Ok(all_bundles)
}

/// Spawns a tokio task to check the nonces of all host transactions in a bundle
/// before sending it to the cache task via the outbound channel.
///
/// Fetches on-chain nonces concurrently for each unique signer, then validates
/// sequentially with a local nonce cache — mirroring the SDK's
/// `check_bundle_tx_list` pattern. Drops bundles where any host tx has a stale
/// or future nonce.
fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should re-use existing validity checks from crates/sim/src/cache/item.rs

let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id);
tokio::spawn(async move {
// Recover the bundle to get typed host tx requirements instead of
// manually decoding and recovering signers.
let recovered = match bundle.bundle.try_to_recovered() {
Ok(r) => r,
Err(error) => {
span_debug!(span, ?error, "Failed to recover bundle, dropping");
return;
}
};

// If no host transactions, forward directly
if recovered.host_txs().is_empty() {
if outbound.send(bundle).is_err() {
span_debug!(span, "Outbound channel closed, stopping nonce check task");
}
return;
}

let Ok(host_provider) =
crate::config().connect_host_provider().instrument(span.clone()).await
else {
span_debug!(span, "Failed to connect to host provider, stopping nonce check task");
return;
};

// Collect host tx requirements (signer + nonce) from the recovered bundle
let reqs: Vec<_> = recovered.host_tx_reqs().collect();

// Fetch on-chain nonces concurrently for each unique signer
let unique_signers: BTreeSet<_> = reqs.iter().map(|req| req.signer).collect();
let nonce_fetches = unique_signers.into_iter().map(|signer| {
let host_provider = &host_provider;
let span = &span;
async move {
host_provider
.get_transaction_count(signer)
.await
.map(|nonce| (signer, nonce))
.inspect_err(|error| {
span_debug!(
span,
?error,
sender = %signer,
"Failed to fetch nonce for sender, dropping bundle"
);
})
}
});

let Ok(fetched) = try_join_all(nonce_fetches).await else {
return;
};
let mut nonce_cache: BTreeMap<_, _> = fetched.into_iter().collect();

// Validate sequentially, checking exact nonce match and incrementing for
// same-signer sequential txs (mirroring check_bundle_tx_list in signet-sim).
for (idx, req) in reqs.iter().enumerate() {
let expected = nonce_cache.get(&req.signer).copied().expect("nonce must be cached");

if req.nonce != expected {
span_debug!(
span,
sender = %req.signer,
tx_nonce = req.nonce,
expected_nonce = expected,
idx,
"Dropping bundle: host tx nonce mismatch"
);
return;
}

nonce_cache.entry(req.signer).and_modify(|nonce| *nonce += 1);
}

if outbound.send(bundle).is_err() {
span_debug!(span, "Outbound channel closed, stopping nonce check task");
}
});
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
Expand All @@ -106,10 +199,7 @@ impl BundlePoller {
counter!("signet.builder.cache.bundle_poll_count").increment(1);
if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
break;
}
Self::spawn_check_bundle_nonces(bundle, outbound.clone());
}
}

Expand Down