Conversation
matthewjablack
left a comment
There was a problem hiding this comment.
Looking solid
Couple nits:
- Would be good to add
ZMQ_BLOCKHASH_ENDPOINTto.env.sample - Also would be good to see a test for the new ZMQ functionality (i.e. unit test for
ZeromqMessage::new_blockhashto double check the byte reversal logic matches Bitcoin's ZMQ spec)
| } else { | ||
| log_debug!(logger_clone, "Listening for sleep completion"); | ||
| sleep.await; | ||
| } |
There was a problem hiding this comment.
Would be good to DRY this (very similar to code above)
| } | ||
| } else { | ||
| log_debug!(logger_clone, "Listening for sleep completion"); | ||
| sleep.await; |
There was a problem hiding this comment.
Would be good to DRY this (very similar to code below)
|
|
||
| let sender_clone = sender.clone(); | ||
| let logger_clone = logger.clone(); | ||
| tokio::spawn(async move { listen_and_notify(socket, sender_clone, logger_clone).await }); |
There was a problem hiding this comment.
The spawned listener task doesn't respond to the stop signal, so it will continue running after ddk shuts down until the process exits. Might be good to pass the stop signal receiver here and checking it in the select:
select! {
_ = stop_signal.changed() => {
log_info!(logger, "ZMQ subscriber shutting down");
break;
}
message = socket.recv() => { ... }
}There was a problem hiding this comment.
Also the spawned task returns Result<(), Error> but the result is discarded. If the task errors (e.g., socket.subscribe fails), it will silently fail. Would be good to either:
- Make the function return () and log errors internally
- Or store the JoinHandle and check it periodically
There was a problem hiding this comment.
it returns a result purely to handle the socket.subscribe potential error. errors from zmq are ignored since zmq is supposed to be self-healing. so we don't necessarily want to error out if we lose the connection for a short period. any other errors parsing the message are also handled without killing the task to ensure notifications are still handled.
as for the stop signal...i did not know that was a thing but i will definitely get that in there.
|
|
||
| let blockhash = hex::encode(hash); | ||
| match handle_message_body(hash, &sender) { | ||
| Ok(_) => log_debug!(logger, "Blockhash {} sucessfully sent from ZMQ client", blockhash), |
There was a problem hiding this comment.
| Ok(_) => log_debug!(logger, "Blockhash {} sucessfully sent from ZMQ client", blockhash), | |
| Ok(_) => log_debug!(logger, "Blockhash {} successfully sent from ZMQ client", blockhash), |
There was a problem hiding this comment.
good catch. that's embarrassing. usually i'm a much better speller.
| #[arg(help = "Url for the postgres database connection.")] | ||
| pub postgres_url: String, | ||
| #[arg(long)] | ||
| #[arg(help = "Endpoint for bitcoind ZeroMQ blockhash notfications")] |
There was a problem hiding this comment.
| #[arg(help = "Endpoint for bitcoind ZeroMQ blockhash notfications")] | |
| #[arg(help = "Endpoint for bitcoind ZeroMQ blockhash notifications")] |
| builder.set_oracle(oracle.clone()); | ||
| builder.set_logger(logger.clone()); | ||
|
|
||
| if let Some(endpoint) = opts.zmq_blockhash_endpoint { |
There was a problem hiding this comment.
If ZMQ_BLOCKHASH_ENDPOINT is set but empty (e.g. in .env), just node-one will pass Some("") and the builder will try to connect to an empty endpoint. Consider treating empty string as “disabled” in the builder or in lib.rs, e.g.:
if let Some(endpoint) = opts.zmq_blockhash_endpoint.filter(|s| !s.is_empty()) {
builder.set_zmq_blockhash_endpoint(endpoint);
}| self | ||
| } | ||
|
|
||
| /// Set the esplora server to connect to. |
| logger, | ||
| "Listening for ZMQ notifications or sleep completion" | ||
| ); | ||
| let mut new_block = zmq_client.subscribe(); |
There was a problem hiding this comment.
This creates a new subscription which pollutes the logs and we don't necessarily need to create a new subscriber on each loop. Instead, we can pass the receiver since the channel is in an Arc and a clone is sent to the spawned task.
async fn wait(
new_block: &mut Option<watch::Receiver<ZeromqMessage>>,
wait_time: u64,
logger: Arc<Logger>,
) {
let sleep = tokio::time::sleep(Duration::from_secs(wait_time));
if let Some(receiver) = new_block {
log_debug!(
logger,
"Listening for ZMQ notifications or sleep completion"
);
select! {
_ = receiver.changed() => {},
_ = sleep => {}
}
} else {
log_debug!(logger, "Listening for sleep completion");
sleep.await;
}
}| continue; | ||
| } | ||
| }; | ||
| log_debug!(logger, "ZMQ message received: {:?}", message); |
There was a problem hiding this comment.
The log here could be removed. We log the blockhash coming in on line 132 and this just prints the frames with escaped characters and hex representation of the hash
ZMQ message received: ZmqMessage { frames: [b"hashblock", b"y\xa9N\xb0\x97D`\xefo\xe8\xe2\\G\x03\xfc{\xf1\x97E'\x1a\xfbfdMp\x16#og\x92\xba", b"\x85\x01\0\0"] }
| - cargo run --bin ddk-node -- --network regtest --esplora $ESPLORA_HOST --name node-one --postgres-url postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST/ddk_one --log debug --zmq-blockhash-endpoint $ZMQ_BLOCKHASH_ENDPOINT | ||
|
|
||
| node-two: | ||
| - cargo run --bin ddk-node -- --network regtest --esplora $ESPLORA_HOST --port 1777 --grpc 0.0.0.0:3031 --storage-dir ~/.ddk/node-two --name node-two --postgres-url postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST/ddk_two --log debug |
This PR adds the ability for a node to listen on a bitcoind blockhash ZeroMQ endpoint and force a wallet sync and contract monitor check.
Current setup
The wallet syncs and contract monitor checks are executed on a set interval; 60 seconds for the wallet syncs and 30 seconds for contract checks. This is fine and functions well. However, if a block (or multiple) are found in that time frame, the node won't pick it up until at least that interval has completed again. This results in an unnecessary lag between block mining and node state updating.
New ZMQ integration
This PR adds the ability for a node to configure a ZMQ blockhash notification endpoint on a local bitcoind node. This allows the software to know immediately when a new block is found and sync right when that notification is seen. In local testing with a regtest node, the notification of a new block was immediate, no lag.
This could be implemented with a feature flag in Cargo. I figured doing it without one is okay for now since that feature flag would infect the code base quite a bit. If it's desired, I can add that in.
Configuration updates
The DDK builder now has a
set_zmq_blockhash_endpointfor setting this endpoint. In addition, theddk-nodehas azmq-blockhash-endpointflag that ends up setting the DDK builder flag under the hood.node-onein thejustfilehas been updated to have this turned on. Because we are using thezmqpubhashblockand notzmqpubrawblock, I needed to add that flag to the bitcoind node's configuration in thedocker-compose.yamlfile.Fallback
The default is the existing behavior. The ZMQ stuff is only used when configured to be on. When the feature is being used, the polling is still there. However, the polling times are much larger in-between polls and are there as a backup in-case something happened with the ZMQ connection. That said, ZMQ is supposed to be self-healing as I understand it.
Testing
Local testing was done with the following process:
just depsCREATE DATABASE ddk; CREATE DATABASE ddk_one; CREATE DATABASE ddk_two;. These databases weren't autocreated.just bc createwallet ddkjust bc generatetoaddress 101 $(just bc getnewaddress)just node-one.just bc generatetoaddress 1 $(just bc getnewaddress)