-
Notifications
You must be signed in to change notification settings - Fork 0
feat: resubmit expired transactions with new blockhash #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7967294
64b0b8f
e5dc185
60daac3
e5bd1d8
4f85344
503153b
39855fc
ef13398
4e5743c
f7f93ee
ce5532a
a0cda7a
95a510e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| use crate::{ | ||
| address::derivation_path, | ||
| guard::TimerGuard, | ||
| runtime::CanisterRuntime, | ||
| signer::sign_bytes, | ||
| state::{TaskType, audit::process_event, event::EventType, mutate_state, read_state}, | ||
| transaction::{SubmitTransactionError, get_recent_blockhash, get_slot, submit_transaction}, | ||
| }; | ||
| use canlog::log; | ||
| use cksol_types_internal::log::Priority; | ||
| use ic_cdk::management_canister::SignCallError; | ||
| use icrc_ledger_types::icrc1::account::Account; | ||
| use sol_rpc_types::Slot; | ||
| use solana_message::Message; | ||
| use solana_signature::Signature; | ||
| use solana_transaction::Transaction; | ||
| use std::time::Duration; | ||
| use thiserror::Error; | ||
|
|
||
| #[cfg(test)] | ||
| mod tests; | ||
|
|
||
| pub const MONITOR_SUBMITTED_TRANSACTIONS_DELAY: Duration = Duration::from_secs(60); | ||
| const MAX_BLOCKHASH_AGE: Slot = 150; | ||
| const MAX_CONCURRENT_TRANSACTIONS: usize = 10; | ||
|
|
||
| pub async fn monitor_submitted_transactions<R: CanisterRuntime>(runtime: R) { | ||
| let _guard = match TimerGuard::new(TaskType::MonitorSubmittedTransactions) { | ||
| Ok(guard) => guard, | ||
| Err(_) => return, | ||
| }; | ||
|
|
||
| if read_state(|state| state.submitted_transactions().is_empty()) { | ||
| return; | ||
| } | ||
|
|
||
| let current_slot = match get_slot(&runtime).await { | ||
| Ok(slot) => slot, | ||
| Err(e) => { | ||
| log!(Priority::Info, "Failed to get current slot: {e}"); | ||
| return; | ||
| } | ||
| }; | ||
| let mut expired_transactions = read_state(|state| { | ||
| state | ||
| .submitted_transactions() | ||
| .iter() | ||
| .filter(|(_, tx)| tx.slot + MAX_BLOCKHASH_AGE < current_slot) | ||
| .map(|(sig, tx)| (*sig, tx.message.clone(), tx.signers.clone())) | ||
| .collect::<Vec<_>>() | ||
| }); | ||
|
lpahlavi marked this conversation as resolved.
|
||
|
|
||
| while !expired_transactions.is_empty() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly to your comment in PR #48, don't we want to limit the number of resubmitted transactions?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed offline, I think we can assume that we don't run out of cycles, as long as we don't submit too many transactions in parallel. |
||
| // TODO DEFI-2670: Combine these two calls once `sol_rpc_client::SolRpcClient` | ||
| // `get_recent_block` method is released. | ||
| let new_blockhash = match get_recent_blockhash(&runtime).await { | ||
| Ok(blockhash) => blockhash, | ||
| Err(e) => { | ||
| log!(Priority::Info, "Failed to get recent blockhash: {e}"); | ||
| return; | ||
| } | ||
| }; | ||
| let new_slot = match get_slot(&runtime).await { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea! Done. |
||
| Ok(slot) => slot, | ||
| Err(e) => { | ||
| log!(Priority::Info, "Failed to get slot: {e}"); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| let batch_size = MAX_CONCURRENT_TRANSACTIONS.min(expired_transactions.len()); | ||
| let futures = expired_transactions.drain(..batch_size).map( | ||
| async |(old_signature, message, signers)| match resubmit_transaction_with_new_blockhash( | ||
| &runtime, | ||
| old_signature, | ||
| message, | ||
| signers, | ||
| new_slot, | ||
| new_blockhash, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(new_signature) => log!( | ||
| Priority::Info, | ||
| "Resubmitted transaction {old_signature} with new signature {new_signature}" | ||
| ), | ||
| Err(e) => log!( | ||
| Priority::Info, | ||
| "Failed to resubmit transaction {old_signature}: {e}" | ||
| ), | ||
| }, | ||
| ); | ||
| futures::future::join_all(futures).await; | ||
| } | ||
|
lpahlavi marked this conversation as resolved.
|
||
| } | ||
|
|
||
| async fn resubmit_transaction_with_new_blockhash<R: CanisterRuntime>( | ||
| runtime: &R, | ||
| old_signature: Signature, | ||
| message: Message, | ||
| signers: Vec<Account>, | ||
| new_slot: Slot, | ||
| new_blockhash: solana_hash::Hash, | ||
| ) -> Result<Signature, ResubmitError> { | ||
| let mut message = message; | ||
| message.recent_blockhash = new_blockhash; | ||
|
|
||
| let mut transaction = Transaction::new_unsigned(message); | ||
| transaction.signatures = sign_bytes( | ||
| signers.iter().map(derivation_path), | ||
| &runtime.signer(), | ||
| transaction.message_data(), | ||
| ) | ||
| .await?; | ||
|
|
||
| let new_signature = transaction.signatures[0]; | ||
|
|
||
| // Record the resubmission event before submitting the transaction to ensure we don't | ||
| // resubmit the same transaction twice in case of a panic during submission. | ||
| mutate_state(|state| { | ||
| process_event( | ||
| state, | ||
| EventType::ResubmittedTransaction { | ||
| old_signature, | ||
| new_signature, | ||
| new_slot, | ||
| }, | ||
| runtime, | ||
| ) | ||
| }); | ||
|
lpahlavi marked this conversation as resolved.
|
||
|
|
||
| submit_transaction(runtime, transaction).await?; | ||
|
|
||
| Ok(new_signature) | ||
| } | ||
|
|
||
| #[derive(Debug, Error)] | ||
| enum ResubmitError { | ||
| #[error("failed to submit new transaction: {0}")] | ||
| Submit(#[from] SubmitTransactionError), | ||
| #[error("failed to sign transaction: {0}")] | ||
| Signing(#[from] SignCallError), | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to introduce the
Errorpriority for such cases?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. IMO this is expected to happen once in a while (e.g. due to provider or connectivity issues) so it's not really an error per se. I would reserve the error priority for something that would need to be looked at. WDYT?