Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 120 additions & 61 deletions crates/bitcoind_rpc/src/bip158.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki

use core::fmt::Debug;

use bdk_core::bitcoin;
use bdk_core::CheckPoint;
use bdk_core::FromBlockHeader;
use bdk_core::ToBlockHash;
use bitcoin::block::Header;
use bitcoin::hashes::Hash;
use bitcoin::pow::CompactTarget;
use bitcoin::BlockHash;
use bitcoin::{bip158::BlockFilter, Block, ScriptBuf};
use bitcoincore_rpc;
Expand All @@ -28,33 +35,65 @@ use bitcoincore_rpc::{json::GetBlockHeaderResult, RpcApi};
/// occur. `FilterIter` will continue to yield events until it reaches the latest chain tip.
/// Events contain the updated checkpoint `cp` which may be incorporated into the local chain
/// state to stay in sync with the tip.
#[derive(Debug)]
pub struct FilterIter<'a> {
pub struct FilterIter<'a, D = BlockHash> {
/// RPC client
client: &'a bitcoincore_rpc::Client,
/// SPK inventory
spks: Vec<ScriptBuf>,
/// checkpoint
cp: CheckPoint<BlockHash>,
cp: CheckPoint<D>,
/// Header info, contains the prev and next hashes for each header.
header: Option<GetBlockHeaderResult>,
/// Closure to convert a block header into checkpoint data `D`.
to_data: Box<dyn Fn(Header) -> D>,
}

impl<'a> FilterIter<'a> {
/// Construct [`FilterIter`] with checkpoint, RPC client and SPKs.
pub fn new(
impl<D: Debug> core::fmt::Debug for FilterIter<'_, D> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("FilterIter")
.field("spks", &self.spks)
.field("cp", &self.cp)
.field("header", &self.header)
.finish_non_exhaustive()
}
}

impl<'a, D: 'static> FilterIter<'a, D> {
/// Construct [`FilterIter`] with a custom closure to convert block headers into checkpoint
/// data.
///
/// Use [`new`](Self::new) for the common case where `D` implements [`FromBlockHeader`].
pub fn new_with(
client: &'a bitcoincore_rpc::Client,
cp: CheckPoint,
cp: CheckPoint<D>,
spks: impl IntoIterator<Item = ScriptBuf>,
to_data: impl Fn(Header) -> D + 'static,
) -> Self {
Self {
client,
spks: spks.into_iter().collect(),
cp,
header: None,
to_data: Box::new(to_data),
}
}
}

impl<'a, D: FromBlockHeader + 'static> FilterIter<'a, D> {
/// Construct [`FilterIter`] with checkpoint, RPC client and SPKs.
pub fn new(
client: &'a bitcoincore_rpc::Client,
cp: CheckPoint<D>,
spks: impl IntoIterator<Item = ScriptBuf>,
) -> Self {
Self::new_with(client, cp, spks, D::from_blockheader)
}
}

impl<'a, D> FilterIter<'a, D>
where
D: ToBlockHash + Clone + Debug,
{
/// Return the agreement header with the remote node.
///
/// Error if no agreement header is found.
Expand All @@ -69,13 +108,76 @@ impl<'a> FilterIter<'a> {
}
Err(Error::ReorgDepthExceeded)
}

fn try_next(&mut self) -> Result<Option<Event<D>>, Error> {
let mut cp = self.cp.clone();

let header = match self.header.take() {
Some(header) => header,
// If no header is cached we need to locate a base of the local
// checkpoint from which the scan may proceed.
None => self.find_base()?,
};

let mut next_hash = match header.next_block_hash {
Some(hash) => hash,
None => return Ok(None),
};

let mut next_header = self.client.get_block_header_info(&next_hash)?;

// In case of a reorg, rewind by fetching headers of previous hashes until we find
// one with enough confirmations.
while next_header.confirmations < 0 {
let prev_hash = next_header
.previous_block_hash
.ok_or(Error::ReorgDepthExceeded)?;
let prev_header = self.client.get_block_header_info(&prev_hash)?;
next_header = prev_header;
}

next_hash = next_header.hash;
let next_height: u32 = next_header.height.try_into()?;

// Reconstruct the block header from the already-fetched GetBlockHeaderResult,
// avoiding an extra `get_block_header` RPC call.
let block_header = Header {
version: next_header.version,
prev_blockhash: next_header
.previous_block_hash
.unwrap_or_else(BlockHash::all_zeros),
merkle_root: next_header.merkle_root,
time: next_header.time as u32,
bits: CompactTarget::from_unprefixed_hex(&next_header.bits)
.map_err(|_| Error::InvalidBits(next_header.bits.clone()))?,
nonce: next_header.nonce,
};

cp = cp.insert(next_height, (self.to_data)(block_header));

let mut block = None;
let filter = BlockFilter::new(self.client.get_block_filter(&next_hash)?.filter.as_slice());
if filter
.match_any(&next_hash, self.spks.iter().map(ScriptBuf::as_ref))
.map_err(Error::Bip158)?
{
block = Some(self.client.get_block(&next_hash)?);
}

// Store the next header
self.header = Some(next_header);
// Update self.cp
self.cp = cp.clone();

Ok(Some(Event { cp, block }))
}
}

/// Event returned by [`FilterIter`].
#[derive(Debug, Clone)]
pub struct Event {
pub struct Event<D = BlockHash> {
/// Checkpoint
pub cp: CheckPoint,
pub cp: CheckPoint<D>,
/// Block, will be `Some(..)` for matching blocks
pub block: Option<Block>,
}
Expand All @@ -92,60 +194,14 @@ impl Event {
}
}

impl Iterator for FilterIter<'_> {
type Item = Result<Event, Error>;
impl<D> Iterator for FilterIter<'_, D>
where
D: ToBlockHash + Clone + Debug,
{
type Item = Result<Event<D>, Error>;

fn next(&mut self) -> Option<Self::Item> {
(|| -> Result<Option<_>, Error> {
let mut cp = self.cp.clone();

let header = match self.header.take() {
Some(header) => header,
// If no header is cached we need to locate a base of the local
// checkpoint from which the scan may proceed.
None => self.find_base()?,
};

let mut next_hash = match header.next_block_hash {
Some(hash) => hash,
None => return Ok(None),
};

let mut next_header = self.client.get_block_header_info(&next_hash)?;

// In case of a reorg, rewind by fetching headers of previous hashes until we find
// one with enough confirmations.
while next_header.confirmations < 0 {
let prev_hash = next_header
.previous_block_hash
.ok_or(Error::ReorgDepthExceeded)?;
let prev_header = self.client.get_block_header_info(&prev_hash)?;
next_header = prev_header;
}

next_hash = next_header.hash;
let next_height: u32 = next_header.height.try_into()?;

cp = cp.insert(next_height, next_hash);

let mut block = None;
let filter =
BlockFilter::new(self.client.get_block_filter(&next_hash)?.filter.as_slice());
if filter
.match_any(&next_hash, self.spks.iter().map(ScriptBuf::as_ref))
.map_err(Error::Bip158)?
{
block = Some(self.client.get_block(&next_hash)?);
}

// Store the next header
self.header = Some(next_header);
// Update self.cp
self.cp = cp.clone();

Ok(Some(Event { cp, block }))
})()
.transpose()
self.try_next().transpose()
}
}

Expand All @@ -160,6 +216,8 @@ pub enum Error {
ReorgDepthExceeded,
/// Error converting an integer
TryFromInt(core::num::TryFromIntError),
/// Invalid bits string from RPC
InvalidBits(String),
}

impl core::fmt::Display for Error {
Expand All @@ -169,6 +227,7 @@ impl core::fmt::Display for Error {
Self::Bip158(e) => write!(f, "{e}"),
Self::ReorgDepthExceeded => write!(f, "maximum reorg depth exceeded"),
Self::TryFromInt(e) => write!(f, "{e}"),
Self::InvalidBits(s) => write!(f, "invalid bits string: {s}"),
}
}
}
Expand Down
Loading
Loading