Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 20 additions & 39 deletions src/actors/consumer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ impl RateLimiter {

/// Get current status
pub fn get_status(&self) -> (u32, u32) {
(
self.current_tokens.load(Ordering::Relaxed) as u32,
self.bucket_capacity,
)
(self.current_tokens.load(Ordering::Relaxed) as u32, self.bucket_capacity)
}
}

Expand Down Expand Up @@ -199,10 +196,7 @@ impl Consumer {
transactions_sending: Arc<AtomicU64>,
) {
let metadata = signed_txn.metadata;
debug!(
"Acquired permit, processing transaction: {:?}",
metadata.txn_id
);
debug!("Acquired permit, processing transaction: {:?}", metadata.txn_id);
transactions_sending.fetch_add(1, Ordering::Relaxed);

let mut last_error: Option<anyhow::Error> = None;
Expand All @@ -215,10 +209,7 @@ impl Consumer {
MAX_RETRIES,
metadata.txn_id
);
match dispatcher
.send_tx(signed_txn.bytes.clone(), metadata.txn_id)
.await
{
match dispatcher.send_tx(signed_txn.bytes.clone(), metadata.txn_id).await {
// Transaction sent successfully
Ok((tx_hash, rpc_url)) => {
tracing::debug!(
Expand Down Expand Up @@ -320,22 +311,22 @@ impl Consumer {
{
// The RPC already told us "nonce too low" - trust this directly.
// Try to get current nonce for better logging, but don't fail if we can't.
let (expect_nonce, actual_nonce, from_account) =
if let Ok(next_nonce) = dispatcher
let (expect_nonce, actual_nonce, from_account) = if let Ok(next_nonce) =
dispatcher
.provider(&url)
.await
.unwrap()
.get_pending_txn_count(metadata.from_account.as_ref().clone())
.await
{
(next_nonce, metadata.nonce, metadata.from_account.clone())
} else {
// Failed to get nonce, but RPC already said "nonce too low"
// Use 0 as placeholder - the important thing is NOT to retry
warn!("Nonce too low but failed to get current nonce for {:?}, treating as resolved", metadata.txn_id);
(0, metadata.nonce, metadata.from_account.clone())
};
{
(next_nonce, metadata.nonce, metadata.from_account.clone())
} else {
// Failed to get nonce, but RPC already said "nonce too low"
// Use 0 as placeholder - the important thing is NOT to retry
warn!("Nonce too low but failed to get current nonce for {:?}, treating as resolved", metadata.txn_id);
(0, metadata.nonce, metadata.from_account.clone())
};

monitor_addr.do_send(UpdateSubmissionResult {
metadata,
result: Arc::new(SubmissionResult::NonceTooLow {
Expand All @@ -348,7 +339,7 @@ impl Consumer {
send_time: Instant::now(),
signed_bytes: Arc::new(signed_txn.bytes.clone()),
});

// After encountering Nonce error, should stop retrying and return regardless
transactions_sending.fetch_sub(1, Ordering::Relaxed);
return;
Expand Down Expand Up @@ -394,13 +385,7 @@ impl Consumer {
max_tps: Option<u32>,
) -> Consumer {
let dispatcher = Arc::new(SimpleDispatcher::new(providers));
Consumer::new(
dispatcher,
max_concurrent_senders,
monitor_addr,
max_pool_size,
max_tps,
)
Consumer::new(dispatcher, max_concurrent_senders, monitor_addr, max_pool_size, max_tps)
}

/// Start transaction pool consumer
Expand All @@ -420,7 +405,7 @@ impl Consumer {
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
info!("Transaction pool consumer started with JoinSet and rate limiting (max_tps: {}).",
info!("Transaction pool consumer started with JoinSet and rate limiting (max_tps: {}).",
if rate_limiter.max_tps == 0 { "unlimited".to_string() } else { rate_limiter.max_tps.to_string() });
let mut in_flight_tasks = tokio::task::JoinSet::new();

Expand Down Expand Up @@ -502,9 +487,7 @@ impl Actor for Consumer {

fn started(&mut self, ctx: &mut Self::Context) {
// Register self with Monitor
self.monitor_addr.do_send(RegisterConsumer {
addr: ctx.address(),
});
self.monitor_addr.do_send(RegisterConsumer { addr: ctx.address() });

let rate_limiter = self.rate_limiter.clone();
let dispatcher = self.dispatcher.clone();
Expand Down Expand Up @@ -588,10 +571,8 @@ impl Handler<RetryTxn> for Consumer {
debug!("Retrying transaction: {:?}", msg.metadata.txn_id);

// Convert to SignedTxnWithMetadata and send through normal channel
let signed_txn = SignedTxnWithMetadata {
bytes: (*msg.signed_bytes).clone(),
metadata: msg.metadata,
};
let signed_txn =
SignedTxnWithMetadata { bytes: (*msg.signed_bytes).clone(), metadata: msg.metadata };

let sender = self.pool_sender.clone();
let pool_size = self.stats.pool_size.clone();
Expand Down
6 changes: 2 additions & 4 deletions src/actors/consumer/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ impl Dispatcher for SimpleDispatcher {
) -> std::result::Result<(TxHash, String), (anyhow::Error, String)> {
let provider = self.select_provider(txn_id);
let rpc_url = provider.rpc().as_ref().clone();
let tx_hash = provider
.send_raw_tx(bytes)
.await
.map_err(|e| (e, provider.rpc().as_ref().clone()))?;
let tx_hash =
provider.send_raw_tx(bytes).await.map_err(|e| (e, provider.rpc().as_ref().clone()))?;

Ok((tx_hash, rpc_url))
}
Expand Down
18 changes: 7 additions & 11 deletions src/actors/monitor/mempool_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use crate::{
eth::{EthHttpCli, MempoolStatus, TxPoolContent},
};



/// Action to take after analyzing mempool status
#[derive(Debug)]
pub enum MempoolAction {
Expand Down Expand Up @@ -105,7 +103,9 @@ impl MempoolTracker {

/// Identify accounts with nonce gaps from txpool_content
/// Returns list of addresses that need correction
pub fn identify_problematic_accounts(content: &TxPoolContent) -> Vec<alloy::primitives::Address> {
pub fn identify_problematic_accounts(
content: &TxPoolContent,
) -> Vec<alloy::primitives::Address> {
let mut problematic_accounts = Vec::new();

for (address, nonces) in &content.queued {
Expand All @@ -114,21 +114,17 @@ impl MempoolTracker {
// We check if they also have pending transactions.
// If they have NO pending transactions but HAVE queued, strictly implies a gap.
let has_pending = content.pending.contains_key(address);

// Check if there are any valid nonces in queued
if nonces.keys().any(|s| s.parse::<u64>().is_ok()) {
if nonces.keys().any(|s| s.parse::<u64>().is_ok()) {
if !has_pending {
problematic_accounts.push(*address);
problematic_accounts.push(*address);
}
}
}

tracing::info!(
"Identified {} accounts with likely nonce gaps",
problematic_accounts.len()
);
tracing::info!("Identified {} accounts with likely nonce gaps", problematic_accounts.len());

problematic_accounts
}
}

3 changes: 0 additions & 3 deletions src/actors/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,13 @@ pub struct PlanCompleted {
pub plan_id: PlanId,
}


#[derive(Message)]
#[rtype(result = "()")]
pub struct PlanFailed {
pub plan_id: PlanId,
pub reason: String,
}



/// Message to retry a timed-out transaction
#[derive(Message, Clone)]
#[rtype(result = "()")]
Expand Down
70 changes: 30 additions & 40 deletions src/actors/monitor/monitor_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use crate::actors::producer::Producer;
use crate::eth::EthHttpCli;
use crate::txn_plan::PlanId;

use super::txn_tracker::{BackpressureAction, PlanStatus, TxnTracker};
use super::mempool_tracker::MempoolAction;
use super::txn_tracker::{BackpressureAction, PlanStatus, TxnTracker};
use super::{
PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer,
ReportProducerStats, RetryTxn, Tick, UpdateSubmissionResult, CorrectNonces,
CorrectNonces, PlanCompleted, PlanFailed, RegisterConsumer, RegisterPlan, RegisterProducer,
ReportProducerStats, RetryTxn, Tick, UpdateSubmissionResult,
};
use crate::actors::{PauseProducer, ResumeProducer};

Expand All @@ -30,7 +30,7 @@ struct LogStats;
pub struct Monitor {
/// Registered Producer address
producer_addr: Option<Addr<Producer>>,
/// Registered Consumer address
/// Registered Consumer address
consumer_addr: Option<Addr<Consumer>>,
/// Transaction and plan tracker
txn_tracker: TxnTracker,
Expand All @@ -39,7 +39,6 @@ pub struct Monitor {
}

impl Monitor {

pub fn new_with_clients(
clients: Vec<std::sync::Arc<EthHttpCli>>,
max_pool_size: usize,
Expand All @@ -50,12 +49,7 @@ impl Monitor {
consumer_addr: None,
txn_tracker: TxnTracker::new(clients.clone(), sampling_policy),
mempool_tracker: MempoolTracker::new(max_pool_size),
clients: Arc::new(
clients
.into_iter()
.map(|client| (client.rpc(), client))
.collect(),
),
clients: Arc::new(clients.into_iter().map(|client| (client.rpc(), client)).collect()),
}
}

Expand Down Expand Up @@ -102,7 +96,7 @@ impl Actor for Monitor {
match act.mempool_tracker.process_pool_status(res, producer_addr) {
Ok((pending, queued, action)) => {
act.txn_tracker.update_mempool_stats(pending, queued);

// Handle nonce correction if needed
if matches!(action, MempoolAction::NeedsNonceCorrection) {
let clients = act.clients.clone();
Expand Down Expand Up @@ -200,24 +194,27 @@ impl Handler<UpdateSubmissionResult> for Monitor {
// If the transaction failed submission, retry it endlessly to prevent nonce gaps
// and premature plan completion. Do NOT tell TxnTracker about the failure yet.
tracing::warn!(
"Transaction failed submission (ErrorWithRetry). Retrying via Consumer. plan_id={}, tx_hash={:?}",
"Transaction failed submission (ErrorWithRetry). Retrying via Consumer. plan_id={}, tx_hash={:?}",
msg.metadata.plan_id,
msg.metadata.txn_id
);

if let Some(consumer) = &self.consumer_addr {
consumer.do_send(RetryTxn {
signed_bytes: msg.signed_bytes.clone(),
metadata: msg.metadata.clone(),
});
} else {
tracing::error!("Cannot retry transaction, no consumer address: {:?}", msg.metadata.txn_id);
tracing::error!(
"Cannot retry transaction, no consumer address: {:?}",
msg.metadata.txn_id
);
// Fallback to tracker if no consumer (will mark as failed)
self.txn_tracker.handle_submission_result(&msg);
}
}
_ => {
self.txn_tracker.handle_submission_result(&msg);
self.txn_tracker.handle_submission_result(&msg);
}
}
}
Expand Down Expand Up @@ -246,24 +243,20 @@ impl Handler<Tick> for Monitor {
let tasks = self.txn_tracker.perform_sampling_check();
let consumer_addr = self.consumer_addr.clone();
if !tasks.is_empty() {
ctx.spawn(
future::join_all(tasks)
.into_actor(self)
.map(move |results, act, _ctx| {
// Process results and get retry queue
let retry_queue = act.txn_tracker.handle_receipt_result(results);

// 3. Send retries to consumer
if let Some(consumer) = &consumer_addr {
for retry_txn in retry_queue {
consumer.do_send(RetryTxn {
signed_bytes: retry_txn.signed_bytes,
metadata: retry_txn.metadata,
});
}
}
}),
);
ctx.spawn(future::join_all(tasks).into_actor(self).map(move |results, act, _ctx| {
// Process results and get retry queue
let retry_queue = act.txn_tracker.handle_receipt_result(results);

// 3. Send retries to consumer
if let Some(consumer) = &consumer_addr {
for retry_txn in retry_queue {
consumer.do_send(RetryTxn {
signed_bytes: retry_txn.signed_bytes,
metadata: retry_txn.metadata,
});
}
}
}));
}

// Check completion status of all plans
Expand Down Expand Up @@ -301,26 +294,23 @@ impl Handler<ProduceTxns> for Monitor {
type Result = ();

fn handle(&mut self, msg: ProduceTxns, _ctx: &mut Self::Context) {
self.txn_tracker
.handler_produce_txns(msg.plan_id, msg.count);
self.txn_tracker.handler_produce_txns(msg.plan_id, msg.count);
}
}

impl Handler<PlanProduced> for Monitor {
type Result = ();

fn handle(&mut self, msg: PlanProduced, _ctx: &mut Self::Context) {
self.txn_tracker
.handle_plan_produced(msg.plan_id, msg.count);
self.txn_tracker.handle_plan_produced(msg.plan_id, msg.count);
}
}

impl Handler<ReportProducerStats> for Monitor {
type Result = ();

fn handle(&mut self, msg: ReportProducerStats, _ctx: &mut Self::Context) {
self.txn_tracker
.update_producer_stats(msg.ready_accounts, msg.sending_txns);
self.txn_tracker.update_producer_stats(msg.ready_accounts, msg.sending_txns);
}
}

Expand Down
Loading
Loading