diff --git a/minter/src/lib.rs b/minter/src/lib.rs index 34a690c6..a2ad246b 100644 --- a/minter/src/lib.rs +++ b/minter/src/lib.rs @@ -4,6 +4,7 @@ mod cycles; mod guard; mod ledger; pub mod lifecycle; +pub mod monitor; mod numeric; pub mod runtime; mod signer; diff --git a/minter/src/main.rs b/minter/src/main.rs index 84650591..88af3bec 100644 --- a/minter/src/main.rs +++ b/minter/src/main.rs @@ -1,6 +1,7 @@ use candid::Principal; use canlog::{Log, Sort}; use cksol_minter::consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits}; +use cksol_minter::monitor::{MONITOR_SUBMITTED_TRANSACTIONS_DELAY, monitor_submitted_transactions}; use cksol_minter::{ address::lazy_get_schnorr_master_key, runtime::IcCanisterRuntime, state::read_state, }; @@ -271,6 +272,9 @@ fn setup_timers() { ic_cdk_timers::set_timer_interval(DEPOSIT_CONSOLIDATION_DELAY, async || { consolidate_deposits(IcCanisterRuntime::new()).await; }); + ic_cdk_timers::set_timer_interval(MONITOR_SUBMITTED_TRANSACTIONS_DELAY, async || { + monitor_submitted_transactions(IcCanisterRuntime::new()).await; + }); } fn main() {} diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs new file mode 100644 index 00000000..97992193 --- /dev/null +++ b/minter/src/monitor/mod.rs @@ -0,0 +1,143 @@ +use crate::{ + address::derivation_path, + guard::TimerGuard, + runtime::CanisterRuntime, + signer::sign_bytes, + state::{TaskType, audit::process_event, event::EventType, mutate_state, read_state}, + transaction::{SubmitTransactionError, get_recent_blockhash, get_slot, submit_transaction}, +}; +use canlog::log; +use cksol_types_internal::log::Priority; +use ic_cdk::management_canister::SignCallError; +use icrc_ledger_types::icrc1::account::Account; +use sol_rpc_types::Slot; +use solana_message::Message; +use solana_signature::Signature; +use solana_transaction::Transaction; +use std::time::Duration; +use thiserror::Error; + +#[cfg(test)] +mod tests; + +pub const MONITOR_SUBMITTED_TRANSACTIONS_DELAY: Duration = Duration::from_secs(60); +const MAX_BLOCKHASH_AGE: Slot = 150; +const MAX_CONCURRENT_TRANSACTIONS: usize = 10; + +pub async fn monitor_submitted_transactions(runtime: R) { + let _guard = match TimerGuard::new(TaskType::MonitorSubmittedTransactions) { + Ok(guard) => guard, + Err(_) => return, + }; + + if read_state(|state| state.submitted_transactions().is_empty()) { + return; + } + + let current_slot = match get_slot(&runtime).await { + Ok(slot) => slot, + Err(e) => { + log!(Priority::Info, "Failed to get current slot: {e}"); + return; + } + }; + let mut expired_transactions = read_state(|state| { + state + .submitted_transactions() + .iter() + .filter(|(_, tx)| tx.slot + MAX_BLOCKHASH_AGE < current_slot) + .map(|(sig, tx)| (*sig, tx.message.clone(), tx.signers.clone())) + .collect::>() + }); + + while !expired_transactions.is_empty() { + // TODO DEFI-2670: Combine these two calls once `sol_rpc_client::SolRpcClient` + // `get_recent_block` method is released. + let new_blockhash = match get_recent_blockhash(&runtime).await { + Ok(blockhash) => blockhash, + Err(e) => { + log!(Priority::Info, "Failed to get recent blockhash: {e}"); + return; + } + }; + let new_slot = match get_slot(&runtime).await { + Ok(slot) => slot, + Err(e) => { + log!(Priority::Info, "Failed to get slot: {e}"); + return; + } + }; + + let batch_size = MAX_CONCURRENT_TRANSACTIONS.min(expired_transactions.len()); + let futures = expired_transactions.drain(..batch_size).map( + async |(old_signature, message, signers)| match resubmit_transaction_with_new_blockhash( + &runtime, + old_signature, + message, + signers, + new_slot, + new_blockhash, + ) + .await + { + Ok(new_signature) => log!( + Priority::Info, + "Resubmitted transaction {old_signature} with new signature {new_signature}" + ), + Err(e) => log!( + Priority::Info, + "Failed to resubmit transaction {old_signature}: {e}" + ), + }, + ); + futures::future::join_all(futures).await; + } +} + +async fn resubmit_transaction_with_new_blockhash( + runtime: &R, + old_signature: Signature, + message: Message, + signers: Vec, + new_slot: Slot, + new_blockhash: solana_hash::Hash, +) -> Result { + let mut message = message; + message.recent_blockhash = new_blockhash; + + let mut transaction = Transaction::new_unsigned(message); + transaction.signatures = sign_bytes( + signers.iter().map(derivation_path), + &runtime.signer(), + transaction.message_data(), + ) + .await?; + + let new_signature = transaction.signatures[0]; + + // Record the resubmission event before submitting the transaction to ensure we don't + // resubmit the same transaction twice in case of a panic during submission. + mutate_state(|state| { + process_event( + state, + EventType::ResubmittedTransaction { + old_signature, + new_signature, + new_slot, + }, + runtime, + ) + }); + + submit_transaction(runtime, transaction).await?; + + Ok(new_signature) +} + +#[derive(Debug, Error)] +enum ResubmitError { + #[error("failed to submit new transaction: {0}")] + Submit(#[from] SubmitTransactionError), + #[error("failed to sign transaction: {0}")] + Signing(#[from] SignCallError), +} diff --git a/minter/src/monitor/tests.rs b/minter/src/monitor/tests.rs new file mode 100644 index 00000000..08cc4958 --- /dev/null +++ b/minter/src/monitor/tests.rs @@ -0,0 +1,233 @@ +use super::{MAX_BLOCKHASH_AGE, monitor_submitted_transactions}; +use crate::{ + address::derive_public_key, + state::{TaskType, audit::process_event, event::EventType, mutate_state, read_state}, + test_fixtures::{ + EventsAssert, MINTER_ACCOUNT, init_schnorr_master_key, init_state, + runtime::TestCanisterRuntime, + }, +}; +use assert_matches::assert_matches; +use ic_ed25519::{PocketIcMasterPublicKeyId, PublicKey}; +use sol_rpc_types::{ConfirmedBlock, MultiRpcResult, RpcError, Slot}; +use solana_hash::Hash; +use solana_message::Message; +use solana_signature::Signature; + +type SlotResult = MultiRpcResult; +type BlockResult = MultiRpcResult; +type SendTransactionResult = MultiRpcResult; + +#[tokio::test] +async fn should_return_early_if_no_transactions_to_resubmit() { + setup(); + + let runtime = TestCanisterRuntime::new().with_increasing_time(); + + monitor_submitted_transactions(runtime).await; + + EventsAssert::assert_no_events_recorded(); +} + +#[tokio::test] +async fn should_return_early_if_task_already_active() { + setup(); + add_submitted_transaction(Signature::from([0x01; 64]), 10); + + mutate_state(|s| { + s.active_tasks_mut() + .insert(TaskType::MonitorSubmittedTransactions); + }); + + monitor_submitted_transactions(TestCanisterRuntime::new()).await; + + // Only SubmittedTransaction event from setup, no resubmission events + EventsAssert::from_recorded() + .expect_event(|e| assert_matches!(e, EventType::SubmittedTransaction { .. })) + .assert_no_more_events(); +} + +#[tokio::test] +async fn should_return_early_if_fetching_current_slot_fails() { + setup(); + add_submitted_transaction(Signature::from([0x01; 64]), 10); + + let error = SlotResult::Consistent(Err(RpcError::ValidationError("Error".to_string()))); + let runtime = TestCanisterRuntime::new() + .add_stub_response(error.clone()) + .add_stub_response(error.clone()) + .add_stub_response(error); + + monitor_submitted_transactions(runtime).await; + + // Only SubmittedTransaction event from setup, no resubmission events + EventsAssert::from_recorded() + .expect_event(|e| assert_matches!(e, EventType::SubmittedTransaction { .. })) + .assert_no_more_events(); +} + +#[tokio::test] +async fn should_not_resubmit_if_transaction_not_expired() { + setup(); + + let original_slot = 100; + add_submitted_transaction(Signature::from([0x01; 64]), original_slot); + + // Current slot is within MAX_BLOCKHASH_AGE of original slot + let current_slot = original_slot + MAX_BLOCKHASH_AGE - 1; + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + .add_stub_response(SlotResult::Consistent(Ok(current_slot))); + + monitor_submitted_transactions(runtime).await; + + // Only SubmittedTransaction event from setup, no resubmission + EventsAssert::from_recorded() + .expect_event(|e| assert_matches!(e, EventType::SubmittedTransaction { .. })) + .assert_no_more_events(); + + // Transaction should still be in submitted_transactions + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), 1); + }); +} + +#[tokio::test] +async fn should_resubmit_single_expired_transaction() { + setup(); + + let old_signature = Signature::from([0x01; 64]); + let original_slot = 10; + add_submitted_transaction(old_signature, original_slot); + + // Current slot is past MAX_BLOCKHASH_AGE + let current_slot = original_slot + MAX_BLOCKHASH_AGE + 1; + let new_slot = current_slot + 5; + let new_signature = Signature::from([0xAA; 64]); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + // get_slot for current slot check + .add_stub_response(SlotResult::Consistent(Ok(current_slot))) + // get_recent_blockhash calls + .add_stub_response(SlotResult::Consistent(Ok(new_slot))) + .add_stub_response(BlockResult::Consistent(Ok(block()))) + // get_slot for new slot + .add_stub_response(SlotResult::Consistent(Ok(new_slot))) + // submit_transaction + .add_stub_response(SendTransactionResult::Consistent(Ok(new_signature.into()))) + // Signature for re-signing (only fee payer since message has no other signers) + .add_signature(new_signature.into()); + + monitor_submitted_transactions(runtime).await; + + EventsAssert::from_recorded() + .expect_event(|e| assert_matches!(e, EventType::SubmittedTransaction { .. })) + .expect_event(|e| { + assert_matches!( + e, + EventType::ResubmittedTransaction { + old_signature: old_sig, + new_signature: new_sig, + new_slot: slot, + } if old_sig == old_signature && new_sig == new_signature && slot == new_slot + ) + }) + .assert_no_more_events(); + + // Old transaction should be replaced with new one + read_state(|s| { + assert_eq!(s.submitted_transactions().len(), 1); + assert!(s.submitted_transactions().contains_key(&new_signature)); + assert!(!s.submitted_transactions().contains_key(&old_signature)); + }); +} + +#[tokio::test] +async fn should_record_event_even_if_submission_fails() { + setup(); + + let old_signature = Signature::from([0x01; 64]); + let original_slot = 10; + add_submitted_transaction(old_signature, original_slot); + + let current_slot = original_slot + MAX_BLOCKHASH_AGE + 1; + let new_slot = current_slot + 5; + let new_signature = Signature::from([0xAA; 64]); + + let runtime = TestCanisterRuntime::new() + .with_increasing_time() + // get_slot for current slot check + .add_stub_response(SlotResult::Consistent(Ok(current_slot))) + // get_recent_blockhash calls + .add_stub_response(SlotResult::Consistent(Ok(new_slot))) + .add_stub_response(BlockResult::Consistent(Ok(block()))) + // get_slot for new slot + .add_stub_response(SlotResult::Consistent(Ok(new_slot))) + // submit_transaction fails + .add_stub_response(SendTransactionResult::Inconsistent(vec![])) + .add_signature(new_signature.into()); + + monitor_submitted_transactions(runtime).await; + + // ResubmittedTransaction event should still be recorded + EventsAssert::from_recorded() + .expect_event(|e| assert_matches!(e, EventType::SubmittedTransaction { .. })) + .expect_event(|e| { + assert_matches!( + e, + EventType::ResubmittedTransaction { + old_signature: old_sig, + new_signature: new_sig, + new_slot: slot, + } if old_sig == old_signature && new_sig == new_signature && slot == new_slot + ) + }) + .assert_no_more_events(); +} + +fn setup() { + init_state(); + init_schnorr_master_key(); +} + +fn minter_address() -> solana_address::Address { + use crate::state::SchnorrPublicKey; + let master_key = SchnorrPublicKey { + public_key: PublicKey::pocketic_key(PocketIcMasterPublicKeyId::Key1), + chain_code: [1; 32], + }; + derive_public_key(&master_key, vec![]) + .serialize_raw() + .into() +} + +fn add_submitted_transaction(signature: Signature, slot: Slot) { + let message = Message::new_with_blockhash(&[], Some(&minter_address()), &Hash::default()); + mutate_state(|state| { + process_event( + state, + EventType::SubmittedTransaction { + signature, + transaction: message, + signers: vec![MINTER_ACCOUNT], + slot, + }, + &TestCanisterRuntime::new().with_increasing_time(), + ) + }); +} + +fn block() -> ConfirmedBlock { + ConfirmedBlock { + previous_blockhash: Default::default(), + blockhash: Hash::from([0x42; 32]).into(), + parent_slot: 0, + block_time: None, + block_height: None, + signatures: None, + rewards: None, + num_reward_partitions: None, + transactions: None, + } +} diff --git a/minter/src/sol_transfer/mod.rs b/minter/src/sol_transfer/mod.rs index 06f9b178..646efc7c 100644 --- a/minter/src/sol_transfer/mod.rs +++ b/minter/src/sol_transfer/mod.rs @@ -13,6 +13,9 @@ use solana_transaction::{Instruction, Message, Transaction}; use std::{collections::BTreeMap, iter}; use thiserror::Error; +#[cfg(test)] +mod tests; + pub const MAX_SIGNATURES: u64 = 10; pub const MAX_TX_SIZE: usize = 1_232; const BYTES_PER_SIGNATURE: usize = 64; @@ -25,9 +28,6 @@ pub enum CreateTransferError { SigningFailed(SignCallError), } -#[cfg(test)] -mod tests; - /// Creates a signed Solana transaction that transfers lamports from /// each minter-controlled address (identified by its account) to the /// destination account's derived address. diff --git a/minter/src/state/mod.rs b/minter/src/state/mod.rs index 876d3cd4..586d8324 100644 --- a/minter/src/state/mod.rs +++ b/minter/src/state/mod.rs @@ -147,6 +147,10 @@ impl State { &self.funds_to_consolidate } + pub fn submitted_transactions(&self) -> &BTreeMap { + &self.submitted_transactions + } + pub fn deposit_status(&self, deposit_id: &DepositId) -> Option { if self.quarantined_deposits.contains_key(deposit_id) { return Some(DepositStatus::Quarantined(deposit_id.signature.into())); @@ -505,6 +509,7 @@ pub struct MintedDeposit { pub enum TaskType { DepositConsolidation, Mint, + MonitorSubmittedTransactions, } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/minter/src/test_fixtures/mod.rs b/minter/src/test_fixtures/mod.rs index 443afd0e..c758ac88 100644 --- a/minter/src/test_fixtures/mod.rs +++ b/minter/src/test_fixtures/mod.rs @@ -30,7 +30,7 @@ pub const DEPOSIT_FEE: Lamport = 10_000_000; // 0.01 SOL pub const WITHDRAWAL_FEE: Lamport = 5_000_000; // 0.005 SOL pub const MINIMUM_WITHDRAWAL_AMOUNT: Lamport = 10_000_000; // 0.01 SOL pub const MINTER_ACCOUNT: Account = Account { - owner: Principal::from_slice(&[1u8; 10]), + owner: runtime::TEST_CANISTER_ID, subaccount: None, }; pub const MINIMUM_DEPOSIT_AMOUNT: Lamport = 10_000_000; // 0.01 SOL