diff --git a/Cargo.toml b/Cargo.toml index 09ae4b03a..d5416d9f6 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]} -bdk_wallet = { version = "2.2.0", default-features = false, features = ["std", "keys-bip39"]} +bdk_wallet = { version = "2.3.0", default-features = false, features = ["std", "keys-bip39"]} reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } rustls = { version = "0.23", default-features = false } diff --git a/src/builder.rs b/src/builder.rs index 510d86bdd..9965a0110 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -55,12 +55,14 @@ use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, - read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer, - write_node_metrics, + read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, + read_scorer, write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::liquidity::{ LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, @@ -73,7 +75,8 @@ use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, - MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, + MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1235,6 +1238,22 @@ fn build_with_store_internal( }, }; + let pending_payment_store = match runtime + .block_on(async { read_pending_payments(&*kv_store, Arc::clone(&logger)).await }) + { + Ok(pending_payments) => Arc::new(PendingPaymentStore::new( + pending_payments, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read pending payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let wallet = Arc::new(Wallet::new( bdk_wallet, wallet_persister, @@ -1243,6 +1262,7 @@ fn build_with_store_internal( Arc::clone(&payment_store), Arc::clone(&config), Arc::clone(&logger), + Arc::clone(&pending_payment_store), )); // Initialize the KeysManager diff --git a/src/data_store.rs b/src/data_store.rs index d295ece51..ff09d9902 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -167,6 +167,10 @@ where })?; Ok(()) } + + pub(crate) fn contains_key(&self, id: &SO::Id) -> bool { + self.objects.lock().unwrap().contains_key(id) + } } #[cfg(test)] diff --git a/src/io/mod.rs b/src/io/mod.rs index 7afd5bd40..e080d39f7 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer"; /// /// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices"; + +/// The pending payment information will be persisted under this prefix. +pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments"; +pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index 4ddc03b07..d2f70377b 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -46,6 +46,7 @@ use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; +use crate::payment::PendingPaymentDetails; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; @@ -626,6 +627,83 @@ pub(crate) fn read_bdk_wallet_change_set( Ok(Some(change_set)) } +/// Read previously persisted pending payments information from the store. +pub(crate) async fn read_pending_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + let mut res = Vec::new(); + + let mut stored_keys = KVStore::list( + &*kv_store, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await?; + + const BATCH_SIZE: usize = 50; + + let mut set = tokio::task::JoinSet::new(); + + // Fill JoinSet with tasks if possible + while set.len() < BATCH_SIZE && !stored_keys.is_empty() { + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + } + + while let Some(read_res) = set.join_next().await { + // Exit early if we get an IO error. + let reader = read_res + .map_err(|e| { + log_error!(logger, "Failed to read PendingPaymentDetails: {}", e); + set.abort_all(); + e + })? + .map_err(|e| { + log_error!(logger, "Failed to read PendingPaymentDetails: {}", e); + set.abort_all(); + e + })?; + + // Refill set for every finished future, if we still have something to do. + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + + // Handle result. + let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e); + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to deserialize PendingPaymentDetails", + ) + })?; + res.push(pending_payment); + } + + debug_assert!(set.is_empty()); + debug_assert!(stored_keys.is_empty()); + + Ok(res) +} + #[cfg(test)] mod tests { use super::read_or_generate_seed_file; diff --git a/src/payment/mod.rs b/src/payment/mod.rs index c82f35c8f..42b5aff3b 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -11,6 +11,7 @@ pub(crate) mod asynchronous; mod bolt11; mod bolt12; mod onchain; +pub(crate) mod pending_payment_store; mod spontaneous; pub(crate) mod store; mod unified; @@ -18,6 +19,7 @@ mod unified; pub use bolt11::Bolt11Payment; pub use bolt12::Bolt12Payment; pub use onchain::OnchainPayment; +pub use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs new file mode 100644 index 000000000..9daf5cb81 --- /dev/null +++ b/src/payment/pending_payment_store.rs @@ -0,0 +1,96 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::Txid; +use lightning::{impl_writeable_tlv_based, ln::channelmanager::PaymentId}; + +use crate::{ + data_store::{StorableObject, StorableObjectUpdate}, + payment::{store::PaymentDetailsUpdate, PaymentDetails}, +}; + +/// Represents a pending payment +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PendingPaymentDetails { + /// The full payment details + pub details: PaymentDetails, + /// Cached timestamp for efficient cleanup queries + pub created_at: u64, + /// Transaction IDs that have replaced or conflict with this payment. + pub conflicting_txids: Vec, +} + +impl PendingPaymentDetails { + pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { + Self { created_at: details.latest_update_timestamp, details, conflicting_txids } + } + + /// Convert to finalized payment for the main payment store + pub fn into_payment_details(self) -> PaymentDetails { + self.details + } +} + +impl_writeable_tlv_based!(PendingPaymentDetails, { + (0, details, required), + (2, created_at, required), + (4, conflicting_txids, optional_vec), +}); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct PendingPaymentDetailsUpdate { + pub id: PaymentId, + pub payment_update: Option, + pub conflicting_txids: Option>, +} + +impl StorableObject for PendingPaymentDetails { + type Id = PaymentId; + type Update = PendingPaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.details.id + } + + fn update(&mut self, update: &Self::Update) -> bool { + let mut updated = false; + + // Update the underlying payment details if present + if let Some(payment_update) = &update.payment_update { + updated |= self.details.update(payment_update); + } + + if let Some(new_conflicting_txids) = &update.conflicting_txids { + if &self.conflicting_txids != new_conflicting_txids { + self.conflicting_txids = new_conflicting_txids.clone(); + updated = true; + } + } + + updated + } + + fn to_update(&self) -> Self::Update { + self.into() + } +} + +impl StorableObjectUpdate for PendingPaymentDetailsUpdate { + fn id(&self) -> ::Id { + self.id + } +} + +impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { + fn from(value: &PendingPaymentDetails) -> Self { + Self { + id: value.id(), + payment_update: Some(value.details.to_update()), + conflicting_txids: Some(value.conflicting_txids.clone()), + } + } +} diff --git a/src/types.rs b/src/types.rs index 2b7d3829a..14e0cc93e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,7 +38,7 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::RuntimeSpawner; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; -use crate::payment::PaymentDetails; +use crate::payment::{PaymentDetails, PendingPaymentDetails}; /// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the /// same time. @@ -609,3 +609,5 @@ impl From<&(u64, Vec)> for CustomTlvRecord { CustomTlvRecord { type_num: tlv.0, value: tlv.1.clone() } } } + +pub(crate) type PendingPaymentStore = DataStore>; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 5fd7b3d8e..05c743bd9 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -12,6 +12,7 @@ use std::sync::{Arc, Mutex}; use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; use bdk_wallet::descriptor::ExtendedDescriptor; +use bdk_wallet::event::WalletEvent; #[allow(deprecated)] use bdk_wallet::SignOptions; use bdk_wallet::{Balance, KeychainKind, PersistedWallet, Update}; @@ -49,8 +50,10 @@ use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::payment::store::ConfirmationStatus; -use crate::payment::{PaymentDetails, PaymentDirection, PaymentStatus}; -use crate::types::{Broadcaster, PaymentStore}; +use crate::payment::{ + PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, +}; +use crate::types::{Broadcaster, PaymentStore, PendingPaymentStore}; use crate::Error; pub(crate) enum OnchainSendAmount { @@ -71,6 +74,7 @@ pub(crate) struct Wallet { payment_store: Arc, config: Arc, logger: Arc, + pending_payment_store: Arc, } impl Wallet { @@ -78,11 +82,20 @@ impl Wallet { wallet: bdk_wallet::PersistedWallet, wallet_persister: KVStoreWalletPersister, broadcaster: Arc, fee_estimator: Arc, payment_store: Arc, - config: Arc, logger: Arc, + config: Arc, logger: Arc, pending_payment_store: Arc, ) -> Self { let inner = Mutex::new(wallet); let persister = Mutex::new(wallet_persister); - Self { inner, persister, broadcaster, fee_estimator, payment_store, config, logger } + Self { + inner, + persister, + broadcaster, + fee_estimator, + payment_store, + config, + logger, + pending_payment_store, + } } pub(crate) fn get_full_scan_request(&self) -> FullScanRequest { @@ -114,15 +127,15 @@ impl Wallet { pub(crate) fn apply_update(&self, update: impl Into) -> Result<(), Error> { let mut locked_wallet = self.inner.lock().unwrap(); - match locked_wallet.apply_update(update) { - Ok(()) => { + match locked_wallet.apply_update_events(update) { + Ok(events) => { let mut locked_persister = self.persister.lock().unwrap(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed })?; - self.update_payment_store(&mut *locked_wallet).map_err(|e| { + self.update_payment_store(&mut *locked_wallet, events).map_err(|e| { log_error!(self.logger, "Failed to update payment store: {}", e); Error::PersistenceFailed })?; @@ -167,73 +180,159 @@ impl Wallet { fn update_payment_store<'a>( &self, locked_wallet: &'a mut PersistedWallet, + mut events: Vec, ) -> Result<(), Error> { - for wtx in locked_wallet.transactions() { - let id = PaymentId(wtx.tx_node.txid.to_byte_array()); - let txid = wtx.tx_node.txid; - let (payment_status, confirmation_status) = match wtx.chain_position { - bdk_chain::ChainPosition::Confirmed { anchor, .. } => { - let confirmation_height = anchor.block_id.height; + if events.is_empty() { + return Ok(()); + } + + // Sort events to ensure proper sequencing for data consistency: + // 1. TXReplaced (0) before TxUnconfirmed (1) - Critical for RBF handling + // When a transaction is replaced via RBF, both events fire. Processing + // TXReplaced first stores the replaced transaction, allowing TxUnconfirmed + // to detect and skip duplicate payment record creation. + // 2. TxConfirmed (2) before ChainTipChanged (3) - Ensures height accuracy + // ChainTipChanged updates block height. Processing TxConfirmed first ensures + // it references the correct height for confirmation depth calculations. + // 3. Other events follow in deterministic order for predictable processing + if events.len() > 1 { + events.sort_by_key(|e| match e { + WalletEvent::TxReplaced { .. } => 0, + WalletEvent::TxUnconfirmed { .. } => 1, + WalletEvent::TxConfirmed { .. } => 2, + WalletEvent::ChainTipChanged { .. } => 3, + WalletEvent::TxDropped { .. } => 4, + _ => 5, + }); + } + + for event in events { + match event { + WalletEvent::TxConfirmed { txid, tx, block_time, .. } => { let cur_height = locked_wallet.latest_checkpoint().height(); + let confirmation_height = block_time.block_id.height; let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 { PaymentStatus::Succeeded } else { PaymentStatus::Pending }; + let confirmation_status = ConfirmationStatus::Confirmed { - block_hash: anchor.block_id.hash, + block_hash: block_time.block_id.hash, height: confirmation_height, - timestamp: anchor.confirmation_time, + timestamp: block_time.confirmation_time, }; - (payment_status, confirmation_status) - }, - bdk_chain::ChainPosition::Unconfirmed { .. } => { - (PaymentStatus::Pending, ConfirmationStatus::Unconfirmed) - }, - }; - // TODO: It would be great to introduce additional variants for - // `ChannelFunding` and `ChannelClosing`. For the former, we could just - // take a reference to `ChannelManager` here and check against - // `list_channels`. But for the latter the best approach is much less - // clear: for force-closes/HTLC spends we should be good querying - // `OutputSweeper::tracked_spendable_outputs`, but regular channel closes - // (i.e., `SpendableOutputDescriptor::StaticOutput` variants) are directly - // spent to a wallet address. The only solution I can come up with is to - // create and persist a list of 'static pending outputs' that we could use - // here to determine the `PaymentKind`, but that's not really satisfactory, so - // we're punting on it until we can come up with a better solution. - let kind = crate::payment::PaymentKind::Onchain { txid, status: confirmation_status }; - let fee = locked_wallet.calculate_fee(&wtx.tx_node.tx).unwrap_or(Amount::ZERO); - let (sent, received) = locked_wallet.sent_and_received(&wtx.tx_node.tx); - let (direction, amount_msat) = if sent > received { - let direction = PaymentDirection::Outbound; - let amount_msat = Some( - sent.to_sat().saturating_sub(fee.to_sat()).saturating_sub(received.to_sat()) - * 1000, - ); - (direction, amount_msat) - } else { - let direction = PaymentDirection::Inbound; - let amount_msat = Some( - received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee.to_sat())) - * 1000, - ); - (direction, amount_msat) - }; - let fee_paid_msat = Some(fee.to_sat() * 1000); + let payment_id = self + .find_payment_by_txid(txid) + .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + payment_status, + confirmation_status, + ); - let payment = PaymentDetails::new( - id, - kind, - amount_msat, - fee_paid_msat, - direction, - payment_status, - ); + let pending_payment = + self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; + self.payment_store.insert_or_update(payment)?; + self.pending_payment_store.insert_or_update(pending_payment)?; + }, + WalletEvent::ChainTipChanged { new_tip, .. } => { + // Get all payments that are Pending with Confirmed status + let pending_payments: Vec = + self.pending_payment_store.list_filter(|p| { + p.details.status == PaymentStatus::Pending + && matches!( + p.details.kind, + PaymentKind::Onchain { + status: ConfirmationStatus::Confirmed { .. }, + .. + } + ) + }); + + for mut payment in pending_payments { + if let PaymentKind::Onchain { + status: ConfirmationStatus::Confirmed { height, .. }, + .. + } = payment.details.kind + { + let payment_id = payment.details.id; + if new_tip.height >= height + ANTI_REORG_DELAY - 1 { + payment.details.status = PaymentStatus::Succeeded; + self.payment_store.insert_or_update(payment.details)?; + self.pending_payment_store.remove(&payment_id)?; + } + } + } + }, + WalletEvent::TxUnconfirmed { txid, tx, old_block_time: None } => { + let payment_id = self + .find_payment_by_txid(txid) + .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + ); + let pending_payment = + self.create_pending_payment_from_tx(payment.clone(), Vec::new()); + self.payment_store.insert_or_update(payment)?; + self.pending_payment_store.insert_or_update(pending_payment)?; + }, + WalletEvent::TxReplaced { txid, conflicts, tx, .. } => { + let payment_id = self + .find_payment_by_txid(txid) + .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + // Collect all conflict txids + let conflict_txids: Vec = + conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect(); + + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + ); + let pending_payment_details = self + .create_pending_payment_from_tx(payment.clone(), conflict_txids.clone()); + + self.pending_payment_store.insert_or_update(pending_payment_details)?; + }, + WalletEvent::TxDropped { txid, tx } => { + let payment_id = self + .find_payment_by_txid(txid) + .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + ); + let pending_payment = + self.create_pending_payment_from_tx(payment.clone(), Vec::new()); + self.payment_store.insert_or_update(payment)?; + self.pending_payment_store.insert_or_update(pending_payment)?; + }, + _ => { + continue; + }, + }; } Ok(()) @@ -806,6 +905,79 @@ impl Wallet { Ok(tx) } + + fn create_payment_from_tx( + &self, locked_wallet: &PersistedWallet, txid: Txid, + payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, + confirmation_status: ConfirmationStatus, + ) -> PaymentDetails { + // TODO: It would be great to introduce additional variants for + // `ChannelFunding` and `ChannelClosing`. For the former, we could just + // take a reference to `ChannelManager` here and check against + // `list_channels`. But for the latter the best approach is much less + // clear: for force-closes/HTLC spends we should be good querying + // `OutputSweeper::tracked_spendable_outputs`, but regular channel closes + // (i.e., `SpendableOutputDescriptor::StaticOutput` variants) are directly + // spent to a wallet address. The only solution I can come up with is to + // create and persist a list of 'static pending outputs' that we could use + // here to determine the `PaymentKind`, but that's not really satisfactory, so + // we're punting on it until we can come up with a better solution. + + let kind = PaymentKind::Onchain { txid, status: confirmation_status }; + + let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let (sent, received) = locked_wallet.sent_and_received(tx); + let fee_sat = fee.to_sat(); + + let (direction, amount_msat) = if sent > received { + ( + PaymentDirection::Outbound, + Some( + (sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat())) + * 1000, + ), + ) + } else { + ( + PaymentDirection::Inbound, + Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000, + ), + ) + }; + + PaymentDetails::new( + payment_id, + kind, + amount_msat, + Some(fee_sat * 1000), + direction, + payment_status, + ) + } + + fn create_pending_payment_from_tx( + &self, payment: PaymentDetails, conflicting_txids: Vec, + ) -> PendingPaymentDetails { + PendingPaymentDetails::new(payment, conflicting_txids) + } + + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { + let direct_payment_id = PaymentId(target_txid.to_byte_array()); + if self.pending_payment_store.contains_key(&direct_payment_id) { + return Some(direct_payment_id); + } + + if let Some(replaced_details) = self + .pending_payment_store + .list_filter(|p| p.conflicting_txids.contains(&target_txid)) + .first() + { + return Some(replaced_details.details.id); + } + + None + } } impl Listen for Wallet { @@ -834,9 +1006,9 @@ impl Listen for Wallet { ); } - match locked_wallet.apply_block(block, height) { - Ok(()) => { - if let Err(e) = self.update_payment_store(&mut *locked_wallet) { + match locked_wallet.apply_block_events(block, height) { + Ok(events) => { + if let Err(e) = self.update_payment_store(&mut *locked_wallet, events) { log_error!(self.logger, "Failed to update payment store: {}", e); return; }