Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions minter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod cycles;
mod guard;
mod ledger;
pub mod lifecycle;
pub mod monitor;
mod numeric;
pub mod runtime;
mod signer;
Expand Down
4 changes: 4 additions & 0 deletions minter/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use candid::Principal;
use canlog::{Log, Sort};
use cksol_minter::consolidate::{DEPOSIT_CONSOLIDATION_DELAY, consolidate_deposits};
use cksol_minter::monitor::{MONITOR_SUBMITTED_TRANSACTIONS_DELAY, monitor_submitted_transactions};
use cksol_minter::{
address::lazy_get_schnorr_master_key, runtime::IcCanisterRuntime, state::read_state,
};
Expand Down Expand Up @@ -271,6 +272,9 @@ fn setup_timers() {
ic_cdk_timers::set_timer_interval(DEPOSIT_CONSOLIDATION_DELAY, async || {
consolidate_deposits(IcCanisterRuntime::new()).await;
});
ic_cdk_timers::set_timer_interval(MONITOR_SUBMITTED_TRANSACTIONS_DELAY, async || {
monitor_submitted_transactions(IcCanisterRuntime::new()).await;
});
}

fn main() {}
Expand Down
143 changes: 143 additions & 0 deletions minter/src/monitor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use crate::{
address::derivation_path,
guard::TimerGuard,
runtime::CanisterRuntime,
signer::sign_bytes,
state::{TaskType, audit::process_event, event::EventType, mutate_state, read_state},
transaction::{SubmitTransactionError, get_recent_blockhash, get_slot, submit_transaction},
};
use canlog::log;
use cksol_types_internal::log::Priority;
use ic_cdk::management_canister::SignCallError;
use icrc_ledger_types::icrc1::account::Account;
use sol_rpc_types::Slot;
use solana_message::Message;
use solana_signature::Signature;
use solana_transaction::Transaction;
use std::time::Duration;
use thiserror::Error;

#[cfg(test)]
mod tests;

pub const MONITOR_SUBMITTED_TRANSACTIONS_DELAY: Duration = Duration::from_secs(60);
const MAX_BLOCKHASH_AGE: Slot = 150;
const MAX_CONCURRENT_TRANSACTIONS: usize = 10;

pub async fn monitor_submitted_transactions<R: CanisterRuntime>(runtime: R) {
let _guard = match TimerGuard::new(TaskType::MonitorSubmittedTransactions) {
Ok(guard) => guard,
Err(_) => return,
};

if read_state(|state| state.submitted_transactions().is_empty()) {
return;
}

let current_slot = match get_slot(&runtime).await {
Ok(slot) => slot,
Err(e) => {
log!(Priority::Info, "Failed to get current slot: {e}");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to introduce the Error priority for such cases?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. IMO this is expected to happen once in a while (e.g. due to provider or connectivity issues) so it's not really an error per se. I would reserve the error priority for something that would need to be looked at. WDYT?

return;
}
};
let mut expired_transactions = read_state(|state| {
state
.submitted_transactions()
.iter()
.filter(|(_, tx)| tx.slot + MAX_BLOCKHASH_AGE < current_slot)
.map(|(sig, tx)| (*sig, tx.message.clone(), tx.signers.clone()))
.collect::<Vec<_>>()
});
Comment thread
lpahlavi marked this conversation as resolved.

while !expired_transactions.is_empty() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to your comment in PR #48, don't we want to limit the number of resubmitted transactions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, I think we can assume that we don't run out of cycles, as long as we don't submit too many transactions in parallel.

// TODO DEFI-2670: Combine these two calls once `sol_rpc_client::SolRpcClient`
// `get_recent_block` method is released.
let new_blockhash = match get_recent_blockhash(&runtime).await {
Ok(blockhash) => blockhash,
Err(e) => {
log!(Priority::Info, "Failed to get recent blockhash: {e}");
return;
}
};
let new_slot = match get_slot(&runtime).await {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once estimate_recent_blockhash is extended to return also the slot, we will refactor this to single call for (new_slot, new_blockhash), is that correct? In that case, maybe add a TODO?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Done.

Ok(slot) => slot,
Err(e) => {
log!(Priority::Info, "Failed to get slot: {e}");
return;
}
};

let batch_size = MAX_CONCURRENT_TRANSACTIONS.min(expired_transactions.len());
let futures = expired_transactions.drain(..batch_size).map(
async |(old_signature, message, signers)| match resubmit_transaction_with_new_blockhash(
&runtime,
old_signature,
message,
signers,
new_slot,
new_blockhash,
)
.await
{
Ok(new_signature) => log!(
Priority::Info,
"Resubmitted transaction {old_signature} with new signature {new_signature}"
),
Err(e) => log!(
Priority::Info,
"Failed to resubmit transaction {old_signature}: {e}"
),
},
);
futures::future::join_all(futures).await;
}
Comment thread
lpahlavi marked this conversation as resolved.
}

async fn resubmit_transaction_with_new_blockhash<R: CanisterRuntime>(
runtime: &R,
old_signature: Signature,
message: Message,
signers: Vec<Account>,
new_slot: Slot,
new_blockhash: solana_hash::Hash,
) -> Result<Signature, ResubmitError> {
let mut message = message;
message.recent_blockhash = new_blockhash;

let mut transaction = Transaction::new_unsigned(message);
transaction.signatures = sign_bytes(
signers.iter().map(derivation_path),
&runtime.signer(),
transaction.message_data(),
)
.await?;

let new_signature = transaction.signatures[0];

// Record the resubmission event before submitting the transaction to ensure we don't
// resubmit the same transaction twice in case of a panic during submission.
mutate_state(|state| {
process_event(
state,
EventType::ResubmittedTransaction {
old_signature,
new_signature,
new_slot,
},
runtime,
)
});
Comment thread
lpahlavi marked this conversation as resolved.

submit_transaction(runtime, transaction).await?;

Ok(new_signature)
}

#[derive(Debug, Error)]
enum ResubmitError {
#[error("failed to submit new transaction: {0}")]
Submit(#[from] SubmitTransactionError),
#[error("failed to sign transaction: {0}")]
Signing(#[from] SignCallError),
}
Loading
Loading