Skip to content

Zmq blockhash notifications#150

Open
w3irdrobot wants to merge 17 commits intobennyhodl:masterfrom
w3irdrobot:zmq-blockhash-notifications
Open

Zmq blockhash notifications#150
w3irdrobot wants to merge 17 commits intobennyhodl:masterfrom
w3irdrobot:zmq-blockhash-notifications

Conversation

@w3irdrobot
Copy link
Copy Markdown

This PR adds the ability for a node to listen on a bitcoind blockhash ZeroMQ endpoint and force a wallet sync and contract monitor check.

Current setup

The wallet syncs and contract monitor checks are executed on a set interval; 60 seconds for the wallet syncs and 30 seconds for contract checks. This is fine and functions well. However, if a block (or multiple) are found in that time frame, the node won't pick it up until at least that interval has completed again. This results in an unnecessary lag between block mining and node state updating.

New ZMQ integration

This PR adds the ability for a node to configure a ZMQ blockhash notification endpoint on a local bitcoind node. This allows the software to know immediately when a new block is found and sync right when that notification is seen. In local testing with a regtest node, the notification of a new block was immediate, no lag.

This could be implemented with a feature flag in Cargo. I figured doing it without one is okay for now since that feature flag would infect the code base quite a bit. If it's desired, I can add that in.

Configuration updates

The DDK builder now has a set_zmq_blockhash_endpoint for setting this endpoint. In addition, the ddk-node has a zmq-blockhash-endpoint flag that ends up setting the DDK builder flag under the hood. node-one in the justfile has been updated to have this turned on. Because we are using the zmqpubhashblock and not zmqpubrawblock, I needed to add that flag to the bitcoind node's configuration in the docker-compose.yaml file.

Note: zmqpubhashblock was chosen instead of zmqpubrawblock since this particular story was more concerned with the timing of the syncing and less removing HTTP requests entirely in the syncing process. Receiving and parsing an entire block seemed wasteful when we really only wanted to know the new block was received. There is a future where the zmqpubrawblock is used entirely and that block is propagated through the system. However, due to the EsploraClient's internal clients leaking out in certain areas, that would be a much larger refactor. I started down that road and retreated.

Fallback

The default is the existing behavior. The ZMQ stuff is only used when configured to be on. When the feature is being used, the polling is still there. However, the polling times are much larger in-between polls and are there as a backup in-case something happened with the ZMQ connection. That said, ZMQ is supposed to be self-healing as I understand it.

Testing

Local testing was done with the following process:

  1. Run just deps
  2. Login to the local database and run CREATE DATABASE ddk; CREATE DATABASE ddk_one; CREATE DATABASE ddk_two;. These databases weren't autocreated.
  3. Create the wallet with just bc createwallet ddk
  4. Mine some blocks with just bc generatetoaddress 101 $(just bc getnewaddress)
  5. Spin up node one with just node-one.
  6. In another terminal, mine another block and observe the logs : just bc generatetoaddress 1 $(just bc getnewaddress)

Note: I used log_info when testing since the debug logs didn't seem to be coming out, and I didn't spend much time debugging that. I changed most of those logs back to log_debug once I confirmed things work.

Copy link
Copy Markdown
Contributor

@matthewjablack matthewjablack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking solid

Couple nits:

  1. Would be good to add ZMQ_BLOCKHASH_ENDPOINT to .env.sample
  2. Also would be good to see a test for the new ZMQ functionality (i.e. unit test for ZeromqMessage::new_blockhash to double check the byte reversal logic matches Bitcoin's ZMQ spec)

Comment thread ddk/src/ddk.rs Outdated
} else {
log_debug!(logger_clone, "Listening for sleep completion");
sleep.await;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to DRY this (very similar to code above)

Comment thread ddk/src/ddk.rs Outdated
}
} else {
log_debug!(logger_clone, "Listening for sleep completion");
sleep.await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to DRY this (very similar to code below)

Comment thread ddk/src/chain/zmq.rs Outdated

let sender_clone = sender.clone();
let logger_clone = logger.clone();
tokio::spawn(async move { listen_and_notify(socket, sender_clone, logger_clone).await });
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spawned listener task doesn't respond to the stop signal, so it will continue running after ddk shuts down until the process exits. Might be good to pass the stop signal receiver here and checking it in the select:

select! {
    _ = stop_signal.changed() => {
        log_info!(logger, "ZMQ subscriber shutting down");
        break;
    }
    message = socket.recv() => { ... }
}

Copy link
Copy Markdown
Contributor

@matthewjablack matthewjablack Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the spawned task returns Result<(), Error> but the result is discarded. If the task errors (e.g., socket.subscribe fails), it will silently fail. Would be good to either:

  1. Make the function return () and log errors internally
  2. Or store the JoinHandle and check it periodically

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it returns a result purely to handle the socket.subscribe potential error. errors from zmq are ignored since zmq is supposed to be self-healing. so we don't necessarily want to error out if we lose the connection for a short period. any other errors parsing the message are also handled without killing the task to ensure notifications are still handled.

as for the stop signal...i did not know that was a thing but i will definitely get that in there.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing in d64b09a!

Comment thread ddk/src/chain/zmq.rs Outdated

let blockhash = hex::encode(hash);
match handle_message_body(hash, &sender) {
Ok(_) => log_debug!(logger, "Blockhash {} sucessfully sent from ZMQ client", blockhash),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ok(_) => log_debug!(logger, "Blockhash {} sucessfully sent from ZMQ client", blockhash),
Ok(_) => log_debug!(logger, "Blockhash {} successfully sent from ZMQ client", blockhash),

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. that's embarrassing. usually i'm a much better speller.

Comment thread ddk-node/src/opts.rs Outdated
#[arg(help = "Url for the postgres database connection.")]
pub postgres_url: String,
#[arg(long)]
#[arg(help = "Endpoint for bitcoind ZeroMQ blockhash notfications")]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[arg(help = "Endpoint for bitcoind ZeroMQ blockhash notfications")]
#[arg(help = "Endpoint for bitcoind ZeroMQ blockhash notifications")]

Comment thread ddk-node/src/lib.rs Outdated
builder.set_oracle(oracle.clone());
builder.set_logger(logger.clone());

if let Some(endpoint) = opts.zmq_blockhash_endpoint {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ZMQ_BLOCKHASH_ENDPOINT is set but empty (e.g. in .env), just node-one will pass Some("") and the builder will try to connect to an empty endpoint. Consider treating empty string as “disabled” in the builder or in lib.rs, e.g.:

if let Some(endpoint) = opts.zmq_blockhash_endpoint.filter(|s| !s.is_empty()) {
    builder.set_zmq_blockhash_endpoint(endpoint);   
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call

@w3irdrobot w3irdrobot requested a review from rorp February 21, 2026 02:10
Copy link
Copy Markdown
Contributor

@rorp rorp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Copy Markdown
Owner

@bennyhodl bennyhodl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work on this! There are a few nits when it comes to logging and the channel. I did find that the log level is hard coded to LogLevel::Info if you could change it to the passed option

LogLevel::Info,

Comment thread ddk/src/builder.rs Outdated
self
}

/// Set the esplora server to connect to.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be bitcoind

Comment thread ddk/src/ddk.rs Outdated
logger,
"Listening for ZMQ notifications or sleep completion"
);
let mut new_block = zmq_client.subscribe();
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a new subscription which pollutes the logs and we don't necessarily need to create a new subscriber on each loop. Instead, we can pass the receiver since the channel is in an Arc and a clone is sent to the spawned task.

 async fn wait(
      new_block: &mut Option<watch::Receiver<ZeromqMessage>>,
      wait_time: u64,
      logger: Arc<Logger>,
  ) {
      let sleep = tokio::time::sleep(Duration::from_secs(wait_time));
      if let Some(receiver) = new_block {
          log_debug!(
              logger,
              "Listening for ZMQ notifications or sleep completion"
          );
          select! {
              _ = receiver.changed() => {},
              _ = sleep => {}
          }
      } else {
          log_debug!(logger, "Listening for sleep completion");
          sleep.await;
      }
  }

Comment thread ddk/src/chain/zmq.rs Outdated
continue;
}
};
log_debug!(logger, "ZMQ message received: {:?}", message);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log here could be removed. We log the blockhash coming in on line 132 and this just prints the frames with escaped characters and hex representation of the hash

ZMQ message received: ZmqMessage { frames: [b"hashblock", b"y\xa9N\xb0\x97D`\xefo\xe8\xe2\\G\x03\xfc{\xf1\x97E'\x1a\xfbfdMp\x16#og\x92\xba", b"\x85\x01\0\0"] }

Comment thread justfile Outdated
- cargo run --bin ddk-node -- --network regtest --esplora $ESPLORA_HOST --name node-one --postgres-url postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST/ddk_one --log debug --zmq-blockhash-endpoint $ZMQ_BLOCKHASH_ENDPOINT

node-two:
- cargo run --bin ddk-node -- --network regtest --esplora $ESPLORA_HOST --port 1777 --grpc 0.0.0.0:3031 --storage-dir ~/.ddk/node-two --name node-two --postgres-url postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST/ddk_two --log debug
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add ZMQ endpoint to node two

@w3irdrobot w3irdrobot requested a review from bennyhodl February 25, 2026 01:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants