From 84c82c9a2fd3fb7f9bf526f7fedd334210afe10c Mon Sep 17 00:00:00 2001 From: Kai Mast Date: Tue, 18 Nov 2025 21:27:37 -0800 Subject: [PATCH 1/6] feat: shut down gracefully --- Cargo.lock | 80 +++++++----- Cargo.toml | 7 +- cli/Cargo.toml | 3 + cli/src/commands/developer/scan.rs | 4 +- cli/src/commands/start.rs | 101 ++++++--------- display/Cargo.toml | 3 + display/src/lib.rs | 15 ++- node/Cargo.toml | 6 +- node/bft/Cargo.toml | 3 + node/bft/examples/simple_node.rs | 8 +- node/bft/ledger-service/Cargo.toml | 3 + node/bft/ledger-service/src/ledger.rs | 21 ++-- node/bft/ledger-service/src/translucent.rs | 18 +-- node/bft/src/sync/mod.rs | 13 +- node/bft/tests/bft_e2e.rs | 12 -- node/bft/tests/common/primary.rs | 5 +- node/bft/tests/common/utils.rs | 6 +- node/bft/tests/components/worker.rs | 12 +- node/bft/tests/gateway_e2e.rs | 3 +- node/bft/tests/narwhal_e2e.rs | 12 -- node/cdn/Cargo.toml | 3 + node/cdn/src/blocks.rs | 45 ++++--- node/router/Cargo.toml | 7 +- node/src/bootstrap_client/mod.rs | 18 +++ node/src/client/mod.rs | 44 +++---- node/src/node.rs | 51 ++++---- node/src/prover/mod.rs | 34 ++--- node/src/traits.rs | 93 +++----------- node/src/validator/mod.rs | 30 ++--- node/tests/common/node.rs | 12 +- utilities/Cargo.toml | 25 ++++ utilities/src/lib.rs | 19 +++ utilities/src/signals.rs | 140 +++++++++++++++++++++ 33 files changed, 494 insertions(+), 362 deletions(-) create mode 100644 utilities/Cargo.toml create mode 100644 utilities/src/lib.rs create mode 100644 utilities/src/signals.rs diff --git a/Cargo.lock b/Cargo.lock index 21cba5b1a5..fcb7a1ca0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -587,9 +587,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.47" +version = "1.2.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd405d82c84ff7f35739f175f67d8b9fb7687a0e84ccdc78bd3568839827cf07" +checksum = "c481bdbf0ed3b892f6f806287d72acd515b352a4ec27a208489b8c1bc839633a" dependencies = [ "find-msvc-tools", "jobserver", @@ -2252,9 +2252,9 @@ dependencies = [ [[package]] name = "instability" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "435d80800b936787d62688c927b6490e887c7ef5ff9ce922c6c6050fca75eb9a" +checksum = "6778b0196eefee7df739db78758e5cf9b37412268bfa5650bfeed028aed20d9c" dependencies = [ "darling 0.20.11", "indoc", @@ -2340,9 +2340,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.82" +version = "0.3.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b011eec8cc36da2aab2d5cff675ec18454fad408585853910a202391cf9f8e65" +checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" dependencies = [ "once_cell", "wasm-bindgen", @@ -3651,9 +3651,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" +checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" dependencies = [ "web-time", "zeroize", @@ -3912,9 +3912,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.16.0" +version = "3.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10574371d41b0d9b2cff89418eda27da52bcaff2cc8741db26382a77c29131f1" +checksum = "4fa237f2807440d238e0364a218270b98f767a00d3dada77b1c53ae88940e2e7" dependencies = [ "base64 0.22.1", "chrono", @@ -3931,9 +3931,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.16.0" +version = "3.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08a72d8216842fdd57820dc78d840bef99248e35fb2554ff923319e60f2d686b" +checksum = "52a8e3ca0ca629121f70ab50f95249e5a6f925cc0f6ffe8256c45b728875706c" dependencies = [ "darling 0.21.3", "proc-macro2", @@ -4136,6 +4136,7 @@ dependencies = [ "snarkos-node-cdn", "snarkos-node-metrics", "snarkos-node-rest", + "snarkos-utilities", "snarkvm", "sys-info", "tempfile", @@ -4156,6 +4157,7 @@ dependencies = [ "crossterm 0.29.0", "ratatui", "snarkos-node", + "snarkos-utilities", "snarkvm", "tokio", ] @@ -4176,7 +4178,6 @@ dependencies = [ "locktick", "lru 0.16.2", "num_cpus", - "once_cell", "parking_lot", "paste", "pea2pea", @@ -4193,6 +4194,7 @@ dependencies = [ "snarkos-node-router", "snarkos-node-sync", "snarkos-node-tcp", + "snarkos-utilities", "snarkvm", "time", "tokio", @@ -4241,6 +4243,7 @@ dependencies = [ "snarkos-node-network", "snarkos-node-sync", "snarkos-node-tcp", + "snarkos-utilities", "snarkvm", "test-log", "test-strategy 0.4.3", @@ -4263,7 +4266,6 @@ dependencies = [ "indexmap 2.12.1", "proptest", "serde", - "snarkos-node-network", "snarkos-node-sync-locators", "snarkvm", "test-strategy 0.4.3", @@ -4284,6 +4286,7 @@ dependencies = [ "rand 0.8.5", "rayon", "snarkos-node-metrics", + "snarkos-utilities", "snarkvm", "tokio", "tracing", @@ -4318,6 +4321,7 @@ dependencies = [ "serde", "serde_json", "snarkos-node-metrics", + "snarkos-utilities", "snarkvm", "tokio", "tokio-test", @@ -4524,6 +4528,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "snarkos-utilities" +version = "4.2.1" +dependencies = [ + "tokio", + "tracing", +] + [[package]] name = "snarkvm" version = "4.4.0" @@ -5697,9 +5709,9 @@ dependencies = [ [[package]] name = "test-log-macros" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "451b374529930d7601b1eef8d32bc79ae870b6079b069401709c2a8bf9e75f36" +checksum = "be35209fd0781c5401458ab66e4f98accf63553e8fae7425503e92fdd319783b" dependencies = [ "proc-macro2", "quote 1.0.42", @@ -6132,9 +6144,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" dependencies = [ "log", "pin-project-lite", @@ -6425,9 +6437,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da95793dfc411fbbd93f5be7715b0578ec61fe87cb1a42b12eb625caa5c5ea60" +checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" dependencies = [ "cfg-if", "once_cell", @@ -6438,9 +6450,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.55" +version = "0.4.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "551f88106c6d5e7ccc7cd9a16f312dd3b5d36ea8b4954304657d5dfba115d4a0" +checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c" dependencies = [ "cfg-if", "js-sys", @@ -6451,9 +6463,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04264334509e04a7bf8690f2384ef5265f05143a4bff3889ab7a3269adab59c2" +checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" dependencies = [ "quote 1.0.42", "wasm-bindgen-macro-support", @@ -6461,9 +6473,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420bc339d9f322e562942d52e115d57e950d12d88983a14c79b86859ee6c7ebc" +checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" dependencies = [ "bumpalo", "proc-macro2", @@ -6474,18 +6486,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.105" +version = "0.2.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76f218a38c84bcb33c25ec7059b07847d465ce0e0a76b995e134a45adcb6af76" +checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" dependencies = [ "unicode-ident", ] [[package]] name = "web-sys" -version = "0.3.82" +version = "0.3.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a1f95c0d03a47f4ae1f7a64643a6bb97465d9b740f0fa8f90ea33915c99a9a1" +checksum = "9b32828d774c412041098d182a8b38b16ea816958e07cf40eec2bc080ae137ac" dependencies = [ "js-sys", "wasm-bindgen", @@ -6848,18 +6860,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.30" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea879c944afe8a2b25fef16bb4ba234f47c694565e97383b36f3a878219065c" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.30" +version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf955aa904d6040f70dc8e9384444cb1030aed272ba3cb09bbc4ab9e7c1f34f5" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" dependencies = [ "proc-macro2", "quote 1.0.42", diff --git a/Cargo.toml b/Cargo.toml index 1c82514cce..d0c3363eaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,8 @@ members = [ "node/sync", "node/sync/communication-service", "node/sync/locators", - "node/tcp" + "node/tcp", + "utilities", ] [workspace.dependencies.aleo-std] @@ -249,6 +250,10 @@ version = "=4.4.0" path = "node/tcp" version = "=4.4.0" +[workspace.dependencies.snarkos-utilities] +path = "utilities" +version = "=4.2.1" + [[bin]] name = "snarkos" path = "snarkos/main.rs" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 72da717c9a..f7b9c6e36a 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -126,6 +126,9 @@ optional = true [dependencies.snarkos-node-rest] workspace = true +[dependencies.snarkos-utilities] +workspace = true + [dependencies.snarkvm] workspace = true features = [ "parameters", "circuit", "package" ] diff --git a/cli/src/commands/developer/scan.rs b/cli/src/commands/developer/scan.rs index 13b999b5b5..d51210ec1a 100644 --- a/cli/src/commands/developer/scan.rs +++ b/cli/src/commands/developer/scan.rs @@ -20,6 +20,8 @@ use crate::{ }; use snarkos_node_cdn::CDN_BASE_URL; +use snarkos_utilities::SimpleStoppable; + use snarkvm::{ console::network::Network, prelude::{Ciphertext, Field, Plaintext, PrivateKey, Record, ViewKey, block::Block}, @@ -312,7 +314,7 @@ impl Scan { let rt = tokio::runtime::Runtime::new()?; // Create a placeholder shutdown flag. - let _shutdown = Default::default(); + let _shutdown = SimpleStoppable::new(); // Copy endpoint for background task. let endpoint = endpoint.clone(); diff --git a/cli/src/commands/start.rs b/cli/src/commands/start.rs index 55d34da83a..32d463e599 100644 --- a/cli/src/commands/start.rs +++ b/cli/src/commands/start.rs @@ -24,6 +24,8 @@ use snarkos_node::{ rest::DEFAULT_REST_PORT, router::DEFAULT_NODE_PORT, }; +use snarkos_utilities::SignalHandler; + use snarkvm::{ console::{ account::{Address, PrivateKey}, @@ -57,7 +59,7 @@ use std::{ }; use tokio::{ runtime::{self, Runtime}, - sync::oneshot, + sync::mpsc, task, }; use tracing::warn; @@ -267,7 +269,7 @@ pub struct Start { } impl Start { - /// Starts the snarkOS node. + /// Starts the snarkOS node and blocks until it terminates. pub fn parse(self) -> Result { // Prepare the shutdown flag. let shutdown: Arc = Default::default(); @@ -285,57 +287,30 @@ impl Start { // Initialize the runtime. Self::runtime().block_on(async move { // Error messages. - let node_parse_error = || "Failed to parse node arguments"; - let display_start_error = || "Failed to initialize the display"; - - let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let node_parse_error = || "Failed to start node"; + let signal_handler = SignalHandler::new(); // Clone the configurations. - let mut cli = self.clone(); - // Parse the network. - match cli.network { - MainnetV0::ID => { - // Parse the node from the configurations. - let node = cli - .parse_node::(shutdown.clone(), shutdown_tx) - .await - .with_context(node_parse_error)?; - // If the display is enabled, render the display. - if !cli.nodisplay { - // Initialize the display. - Display::start(node, log_receiver).with_context(display_start_error)?; - } - } - TestnetV0::ID => { - // Parse the node from the configurations. - let node = cli - .parse_node::(shutdown.clone(), shutdown_tx) - .await - .with_context(node_parse_error)?; - // If the display is enabled, render the display. - if !cli.nodisplay { - // Initialize the display. - Display::start(node, log_receiver).with_context(display_start_error)?; - } - } - CanaryV0::ID => { - // Parse the node from the configurations. - let node = cli - .parse_node::(shutdown.clone(), shutdown_tx) - .await - .with_context(node_parse_error)?; - // If the display is enabled, render the display. - if !cli.nodisplay { - // Initialize the display. - Display::start(node, log_receiver).with_context(display_start_error)?; - } - } + let mut self_ = self.clone(); + + // Parse the node arguments, start it, and block until shutdown. + match self_.network { + MainnetV0::ID => self_ + .parse_node::(log_receiver, signal_handler.clone()) + .await + .with_context(node_parse_error)?, + + TestnetV0::ID => self_ + .parse_node::(log_receiver, signal_handler.clone()) + .await + .with_context(node_parse_error)?, + CanaryV0::ID => self_ + .parse_node::(log_receiver, signal_handler.clone()) + .await + .with_context(node_parse_error)?, _ => panic!("Invalid network ID specified"), }; - // Wait for the shutdown signal. - let _ = shutdown_rx.await; - Ok(String::new()) }) } @@ -600,9 +575,9 @@ impl Start { } } - /// Returns the node type corresponding to the given configurations. + /// Start the node and blocks until it terminates. #[rustfmt::skip] - async fn parse_node(&mut self, shutdown: Arc, shutdown_tx: oneshot::Sender<()>) -> Result> { + async fn parse_node(&mut self, log_receiver: mpsc::Receiver>, signal_handler: Arc) -> Result<()> { if !self.nobanner { // Print the welcome banner. println!("{}", crate::helpers::welcome_message()); @@ -742,24 +717,28 @@ impl Start { } }; - // TODO(kaimast): start the display earlier and show sync progress. if !self.nodisplay && !self.nocdn { println!("🪧 The terminal UI will not start until the node has finished syncing from the CDN. If this step takes too long, consider restarting with `--nodisplay`."); } - let shutdown_tx = Some(shutdown_tx); - // Initialize the node. - match node_type { - NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, self.trusted_peers_only, dev_txs, self.dev, shutdown.clone(), shutdown_tx).await, - NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, storage_mode, self.trusted_peers_only, self.dev, shutdown.clone(), shutdown_tx).await, - NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, self.trusted_peers_only, self.dev, shutdown, shutdown_tx).await, + let node = match node_type { + NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, self.trusted_peers_only, dev_txs, self.dev, signal_handler.clone()).await, + NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, storage_mode, self.trusted_peers_only, self.dev, signal_handler.clone()).await, + NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, self.trusted_peers_only, self.dev, signal_handler.clone()).await, NodeType::BootstrapClient => Node::new_bootstrap_client(node_ip, account, *genesis.header(), self.dev).await, + }?; + + if !self.nodisplay { + Display::start(node.clone(), log_receiver, signal_handler.clone()).with_context(|| "Failed to start the display")?; } + + node.wait_for_signals(&signal_handler).await; + Ok(()) } - /// Returns a runtime for the node. + /// Starts a rayon thread pool and tokio runtime for the node, and returns the tokio `Runtime`. fn runtime() -> Runtime { // Retrieve the number of cores. let num_cores = num_cpus::get(); @@ -770,14 +749,16 @@ impl Start { let (num_tokio_worker_threads, max_tokio_blocking_threads, num_rayon_cores_global) = (2 * num_cores, 512, num_cores); - // Initialize the parallelization parameters. + // Set up the rayon thread pool. + // A custom panic handler is not needed here, as rayon propagates the panic to the calling thread by default (except for `rayon::spawn` which we do not use). rayon::ThreadPoolBuilder::new() .stack_size(8 * 1024 * 1024) .num_threads(num_rayon_cores_global) .build_global() .unwrap(); - // Initialize the runtime configuration. + // Set up the tokio Runtime. + // TODO(kaimast): set up a panic handler here for each worker thread once [`tokio::runtime::Builder::unhandled_panic`](https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.unhandled_panic) is stabilized. runtime::Builder::new_multi_thread() .enable_all() .thread_stack_size(8 * 1024 * 1024) diff --git a/display/Cargo.toml b/display/Cargo.toml index 551e18fef7..4b5f034cff 100644 --- a/display/Cargo.toml +++ b/display/Cargo.toml @@ -28,6 +28,9 @@ version = "0.29" [dependencies.snarkos-node] workspace = true +[dependencies.snarkos-utilities] +workspace = true + [dependencies.snarkvm] workspace = true diff --git a/display/src/lib.rs b/display/src/lib.rs index 1a03058594..e97be68eae 100644 --- a/display/src/lib.rs +++ b/display/src/lib.rs @@ -22,6 +22,8 @@ mod tabs; use tabs::Tabs; use snarkos_node::Node; +use snarkos_utilities::Stoppable; + use snarkvm::prelude::Network; use anyhow::Result; @@ -41,6 +43,7 @@ use ratatui::{ }; use std::{ io, + sync::Arc, thread, time::{Duration, Instant}, }; @@ -67,7 +70,7 @@ fn content_style() -> Style { impl Display { /// Initializes a new display. - pub fn start(node: Node, log_receiver: Receiver>) -> Result<()> { + pub fn start(node: Node, log_receiver: Receiver>, stoppable: Arc) -> Result<()> { // Initialize the display. enable_raw_mode()?; let mut stdout = io::stdout(); @@ -84,7 +87,7 @@ impl Display { }; // Render the display. - let res = display.render(&mut terminal); + let res = display.render(&mut terminal, stoppable); // Terminate the display. disable_raw_mode()?; @@ -102,7 +105,7 @@ impl Display { impl Display { /// Renders the display. - fn render(&mut self, terminal: &mut Terminal) -> io::Result<()> { + fn render(&mut self, terminal: &mut Terminal, stoppable: Arc) -> io::Result<()> { let mut last_tick = Instant::now(); loop { terminal.draw(|f| self.draw(f))?; @@ -114,11 +117,7 @@ impl Display { if let Event::Key(key) = event::read()? { match key.code { KeyCode::Esc => { - // // TODO (howardwu): @ljedrz to implement a wrapping scope for Display within Node/Server. - // #[allow(unused_must_use)] - // { - // self.node.shut_down(); - // } + stoppable.stop(); return Ok(()); } KeyCode::Left => self.tabs.previous(), diff --git a/node/Cargo.toml b/node/Cargo.toml index 23b298eb59..9f9b4b28a0 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -83,9 +83,6 @@ workspace = true [dependencies.num_cpus] workspace = true -[dependencies.once_cell] -workspace = true - [dependencies.parking_lot] workspace = true @@ -130,6 +127,9 @@ workspace = true [dependencies.snarkos-node-tcp] workspace = true +[dependencies.snarkos-utilities] +workspace = true + [dependencies.snarkvm] workspace = true diff --git a/node/bft/Cargo.toml b/node/bft/Cargo.toml index 8e14863afa..f3dfef71c4 100644 --- a/node/bft/Cargo.toml +++ b/node/bft/Cargo.toml @@ -120,6 +120,9 @@ workspace = true [dependencies.snarkos-node-tcp] workspace = true +[dependencies.snarkos-utilities] +workspace = true + [dependencies.snarkvm] workspace = true features = [ "utilities" ] diff --git a/node/bft/examples/simple_node.rs b/node/bft/examples/simple_node.rs index a39770d8ea..99536287f1 100644 --- a/node/bft/examples/simple_node.rs +++ b/node/bft/examples/simple_node.rs @@ -19,7 +19,6 @@ extern crate tracing; #[cfg(feature = "metrics")] extern crate snarkos_node_metrics as metrics; -use aleo_std::StorageMode; use snarkos_account::Account; use snarkos_node_bft::{ BFT, @@ -30,6 +29,9 @@ use snarkos_node_bft::{ use snarkos_node_bft_ledger_service::TranslucentLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; use snarkos_node_sync::BlockSync; +use snarkos_utilities::SimpleStoppable; + +use aleo_std::StorageMode; use snarkvm::{ console::{account::PrivateKey, algorithms::BHP256, types::Address}, ledger::{ @@ -64,7 +66,7 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, str::FromStr, - sync::{Arc, Mutex, OnceLock, atomic::AtomicBool}, + sync::{Arc, Mutex, OnceLock}, }; use tokio::{net::TcpListener, sync::oneshot}; use tracing_subscriber::{ @@ -233,7 +235,7 @@ fn create_ledger( } let mut rng = TestRng::default(); let gen_ledger = genesis_ledger(*gen_key, committee.clone(), balances.clone(), node_id, &mut rng); - Arc::new(TranslucentLedgerService::new(gen_ledger, Arc::new(AtomicBool::new(false)))) + Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new())) } pub type CurrentLedger = Ledger>; diff --git a/node/bft/ledger-service/Cargo.toml b/node/bft/ledger-service/Cargo.toml index 8bcb6ea3e6..e77a913704 100644 --- a/node/bft/ledger-service/Cargo.toml +++ b/node/bft/ledger-service/Cargo.toml @@ -48,6 +48,9 @@ optional = true workspace = true optional = true +[dependencies.snarkos-utilities] +workspace = true + [dependencies.parking_lot] workspace = true optional = true diff --git a/node/bft/ledger-service/src/ledger.rs b/node/bft/ledger-service/src/ledger.rs index 09ff58632c..95c63d6622 100644 --- a/node/bft/ledger-service/src/ledger.rs +++ b/node/bft/ledger-service/src/ledger.rs @@ -14,6 +14,9 @@ // limitations under the License. use crate::{LedgerService, fmt_id, spawn_blocking}; + +use snarkos_utilities::Stoppable; + use snarkvm::{ ledger::{ Ledger, @@ -49,28 +52,20 @@ use parking_lot::RwLock; #[cfg(not(feature = "serial"))] use rayon::prelude::*; -use std::{ - fmt, - io::Read, - ops::Range, - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, -}; +use std::{fmt, io::Read, ops::Range, sync::Arc}; /// A core ledger service. #[allow(clippy::type_complexity)] pub struct CoreLedgerService> { ledger: Ledger, latest_leader: Arc)>>>, - shutdown: Arc, + stoppable: Arc, } impl> CoreLedgerService { /// Initializes a new core ledger service. - pub fn new(ledger: Ledger, shutdown: Arc) -> Self { - Self { ledger, latest_leader: Default::default(), shutdown } + pub fn new(ledger: Ledger, stoppable: Arc) -> Self { + Self { ledger, latest_leader: Default::default(), stoppable } } } @@ -360,7 +355,7 @@ impl> LedgerService for CoreLedgerService< #[cfg(feature = "ledger-write")] fn advance_to_next_block(&self, block: &Block) -> Result<()> { // If the Ctrl-C handler registered the signal, then skip advancing to the next block. - if self.shutdown.load(Ordering::Acquire) { + if self.stoppable.is_stopped() { bail!("Skipping advancing to block {} - The node is shutting down", block.height()); } // Advance to the next block. diff --git a/node/bft/ledger-service/src/translucent.rs b/node/bft/ledger-service/src/translucent.rs index 7c3a5fcd26..b4ffb988df 100644 --- a/node/bft/ledger-service/src/translucent.rs +++ b/node/bft/ledger-service/src/translucent.rs @@ -14,8 +14,9 @@ // limitations under the License. use crate::{CoreLedgerService, LedgerService}; -use async_trait::async_trait; -use indexmap::IndexMap; + +use snarkos_utilities::Stoppable; + use snarkvm::{ ledger::{ Ledger, @@ -27,11 +28,10 @@ use snarkvm::{ }, prelude::{Address, ConsensusVersion, Field, Network, Result, narwhal::BatchCertificate}, }; -use std::{ - fmt, - ops::Range, - sync::{Arc, atomic::AtomicBool}, -}; + +use async_trait::async_trait; +use indexmap::IndexMap; +use std::{fmt, ops::Range, sync::Arc}; pub struct TranslucentLedgerService> { inner: CoreLedgerService, @@ -46,8 +46,8 @@ impl> fmt::Debug for TranslucentLedgerService impl> TranslucentLedgerService { /// Initializes a new ledger service wrapper. - pub fn new(ledger: Ledger, shutdown: Arc) -> Self { - Self { inner: CoreLedgerService::new(ledger, shutdown) } + pub fn new(ledger: Ledger, stoppable: Arc) -> Self { + Self { inner: CoreLedgerService::new(ledger, stoppable) } } } diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 4d84da0028..9cf74256eb 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -1020,6 +1020,7 @@ mod tests { use snarkos_account::Account; use snarkos_node_sync::BlockSync; + use snarkos_utilities::SimpleStoppable; use snarkvm::{ console::{ account::{Address, PrivateKey}, @@ -1072,7 +1073,7 @@ mod tests { let genesis_clone = genesis.clone(); let ledger = spawn_blocking!(CurrentLedger::load(genesis_clone, StorageMode::new_test(None))).unwrap(); // Initialize the ledger. - let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default())); + let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new())); // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors. let (round_to_certificates_map, committee) = { @@ -1252,10 +1253,10 @@ mod tests { spawn_blocking!(ledger.advance_to_next_block(&block))?; // Initialize the syncing ledger. - let syncing_ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap(); - let syncing_ledger = Arc::new(CoreLedgerService::new(syncing_ledger, Default::default())); + let syncing_ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::Test(None))).unwrap(); + let syncing_ledger = Arc::new(CoreLedgerService::new(syncing_ledger, SimpleStoppable::new())); + // Initialize the gateway. - let storage_mode = StorageMode::new_test(None); let gateway = Gateway::new( account.clone(), storage.clone(), @@ -1263,7 +1264,7 @@ mod tests { None, &[], false, - storage_mode, + StorageMode::Test(None), None, )?; // Initialize the block synchronization logic. @@ -1315,7 +1316,7 @@ mod tests { // Initialize the ledger with the genesis block. let ledger = spawn_blocking!(CurrentLedger::load(genesis, StorageMode::new_test(None))).unwrap(); // Initialize the ledger. - let core_ledger = Arc::new(CoreLedgerService::new(ledger, Default::default())); + let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), SimpleStoppable::new())); // Sample rounds of batch certificates starting at the genesis round from a static set of 4 authors. let (round_to_certificates_map, committee) = { // Initialize the committee. diff --git a/node/bft/tests/bft_e2e.rs b/node/bft/tests/bft_e2e.rs index 0b89aa7ff3..c1e788a953 100644 --- a/node/bft/tests/bft_e2e.rs +++ b/node/bft/tests/bft_e2e.rs @@ -156,10 +156,6 @@ async fn test_quorum_threshold() { const TARGET_ROUND: u64 = 4; let net = network.clone(); deadline!(Duration::from_secs(20), move || { net.is_round_reached(TARGET_ROUND) }); - - tokio::task::spawn_blocking(move || { - drop(network); - }); } #[tokio::test(flavor = "multi_thread")] @@ -194,10 +190,6 @@ async fn test_quorum_break() { // Check the nodes have stopped advancing through the rounds. assert!(network.is_halted().await); - - tokio::task::spawn_blocking(move || { - drop(network); - }); } #[tokio::test(flavor = "multi_thread")] @@ -259,10 +251,6 @@ async fn test_leader_election_consistency() { // Assert that all leaders are equal assert!(leaders.iter().all_equal()); } - - tokio::task::spawn_blocking(move || { - drop(network); - }); } #[tokio::test(flavor = "multi_thread")] diff --git a/node/bft/tests/common/primary.rs b/node/bft/tests/common/primary.rs index 54dc04fd98..3e65ec77db 100644 --- a/node/bft/tests/common/primary.rs +++ b/node/bft/tests/common/primary.rs @@ -18,6 +18,7 @@ use crate::common::{ TranslucentLedgerService, utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions, initialize_logger}, }; + use snarkos_account::Account; use snarkos_node_bft::{ BFT, @@ -29,6 +30,8 @@ use snarkos_node_bft::{ use snarkos_node_bft_storage_service::BFTMemoryService; use snarkos_node_network::PeerPoolHandling; use snarkos_node_sync::BlockSync; +use snarkos_utilities::SimpleStoppable; + use snarkvm::{ console::{ account::{Address, PrivateKey}, @@ -160,7 +163,7 @@ impl TestNetwork { for (id, account) in accounts.into_iter().enumerate() { let gen_ledger = genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), &mut rng); - let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, Default::default())); + let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new())); let storage = Storage::new( ledger.clone(), Arc::new(BFTMemoryService::new()), diff --git a/node/bft/tests/common/utils.rs b/node/bft/tests/common/utils.rs index 27d527a38c..c9ba1d2aec 100644 --- a/node/bft/tests/common/utils.rs +++ b/node/bft/tests/common/utils.rs @@ -22,7 +22,9 @@ use snarkos_node_bft::{ helpers::{PrimarySender, Storage}, }; -use snarkos_node_bft_storage_service::BFTMemoryService; +use snarkos_node_bft::storage_service::BFTMemoryService; +use snarkos_utilities::SimpleStoppable; + use snarkvm::{ console::account::Address, ledger::{ @@ -202,7 +204,7 @@ pub fn sample_ledger( let gen_ledger = primary::genesis_ledger(gen_key, committee.clone(), balances.clone(), bonded_balances.clone(), rng); - Arc::new(TranslucentLedgerService::new(gen_ledger, Default::default())) + Arc::new(TranslucentLedgerService::new(gen_ledger, SimpleStoppable::new())) } /// Samples a new storage with the given ledger. diff --git a/node/bft/tests/components/worker.rs b/node/bft/tests/components/worker.rs index 921aa8ae2d..be53686ae0 100644 --- a/node/bft/tests/components/worker.rs +++ b/node/bft/tests/components/worker.rs @@ -36,11 +36,7 @@ async fn test_resend_transmission_request() { // Initialize the accounts and the committee. let (accounts, committee) = new_test_committee(num_nodes, &mut rng); // Sample a ledger. - let accounts_ = accounts.clone(); - let committee_ = committee.clone(); - let ledger = tokio::task::spawn_blocking(move || { - sample_ledger(&accounts_, &committee_, &mut rng) - }).await.unwrap(); + let ledger = sample_ledger(&accounts, &committee, &mut rng); // Sample a worker. let worker = sample_worker(0, accounts[0].clone(), ledger.clone()); @@ -122,11 +118,7 @@ async fn test_flood_transmission_requests() { // Initialize the accounts and the committee. let (accounts, committee) = new_test_committee(num_nodes, &mut rng); // Sample a ledger. - let accounts_ = accounts.clone(); - let committee_ = committee.clone(); - let ledger = tokio::task::spawn_blocking(move || { - sample_ledger(&accounts_, &committee_, &mut rng) - }).await.unwrap(); + let ledger = sample_ledger(&accounts, &committee, &mut rng); // Sample a worker. let worker = sample_worker(0, accounts[0].clone(), ledger.clone()); diff --git a/node/bft/tests/gateway_e2e.rs b/node/bft/tests/gateway_e2e.rs index 32ba053bad..909341d863 100644 --- a/node/bft/tests/gateway_e2e.rs +++ b/node/bft/tests/gateway_e2e.rs @@ -33,7 +33,6 @@ use std::time::Duration; use deadline::deadline; use rand::Rng; -use tokio::task; async fn new_test_gateway( num_nodes: u16, @@ -42,7 +41,7 @@ async fn new_test_gateway( let (accounts, committee) = new_test_committee(num_nodes, rng); let accounts_ = accounts.clone(); let mut rng_ = TestRng::fixed(rng.r#gen()); - let ledger = task::spawn_blocking(move || sample_ledger(&accounts_, &committee, &mut rng_)).await.unwrap(); + let ledger = sample_ledger(&accounts_, &committee, &mut rng_); let storage = sample_storage(ledger.clone()); let gateway = sample_gateway(accounts[0].clone(), storage, ledger); diff --git a/node/bft/tests/narwhal_e2e.rs b/node/bft/tests/narwhal_e2e.rs index 8072f2592d..ea731fb70d 100644 --- a/node/bft/tests/narwhal_e2e.rs +++ b/node/bft/tests/narwhal_e2e.rs @@ -108,10 +108,6 @@ async fn test_quorum_threshold() { const TARGET_ROUND: u64 = 4; let net = network.clone(); deadline!(Duration::from_secs(20), move || { net.is_round_reached(TARGET_ROUND) }); - - tokio::task::spawn_blocking(move || { - drop(network); - }); } #[tokio::test(flavor = "multi_thread")] @@ -146,10 +142,6 @@ async fn test_quorum_break() { // Check the nodes have stopped advancing through the rounds. assert!(network.is_halted().await); - - tokio::task::spawn_blocking(move || { - drop(network); - }); } #[tokio::test(flavor = "multi_thread")] @@ -186,8 +178,4 @@ async fn test_storage_coherence() { // check only up to 3 rounds before the the target round, because the network advances when // quorum is reached, not when all the nodes have completed the round and received certificates. assert!(network.is_certificate_round_coherent(1..TARGET_ROUND - 2)); - - tokio::task::spawn_blocking(move || { - drop(network); - }); } diff --git a/node/cdn/Cargo.toml b/node/cdn/Cargo.toml index 1269103ed7..a7274378fe 100644 --- a/node/cdn/Cargo.toml +++ b/node/cdn/Cargo.toml @@ -45,6 +45,9 @@ workspace = true optional = true features = [ "metrics" ] +[dependencies.snarkos-utilities] +workspace = true + [dependencies.rayon] workspace = true optional = true diff --git a/node/cdn/src/blocks.rs b/node/cdn/src/blocks.rs index 4b489d195e..ab9a13db10 100644 --- a/node/cdn/src/blocks.rs +++ b/node/cdn/src/blocks.rs @@ -17,6 +17,8 @@ // https://github.com/rust-lang/rust-clippy/issues/6446 #![allow(clippy::await_holding_lock)] +use snarkos_utilities::Stoppable; + use snarkvm::prelude::{ Deserialize, DeserializeOwned, @@ -87,11 +89,11 @@ impl CdnBlockSync { pub fn new>( base_url: http::Uri, ledger: Ledger, - shutdown: Arc, + stoppable: Arc, ) -> Self { let task = { let base_url = base_url.clone(); - tokio::spawn(async move { Self::worker(base_url, ledger, shutdown).await }) + tokio::spawn(async move { Self::worker(base_url, ledger, stoppable).await }) }; debug!("Started sync from CDN at {base_url}"); @@ -119,13 +121,13 @@ impl CdnBlockSync { async fn worker>( base_url: http::Uri, ledger: Ledger, - shutdown: Arc, + stoppable: Arc, ) -> SyncResult { // Fetch the node height. let start_height = ledger.latest_height() + 1; // Load the blocks from the CDN into the ledger. let ledger_clone = ledger.clone(); - let result = load_blocks(&base_url, start_height, None, shutdown, move |block: Block| { + let result = load_blocks(&base_url, start_height, None, stoppable, move |block: Block| { ledger_clone.advance_to_next_block(&block) }) .await; @@ -172,7 +174,7 @@ pub async fn load_blocks( base_url: &http::Uri, start_height: u32, end_height: Option, - shutdown: Arc, + stoppable: Arc, process: impl FnMut(Block) -> Result<()> + Clone + Send + Sync + 'static, ) -> Result { // Create a Client to maintain a connection pool throughout the sync. @@ -225,16 +227,19 @@ pub async fn load_blocks( // Spawn a background task responsible for concurrent downloads. let pending_blocks_clone = pending_blocks.clone(); let base_url = base_url.to_owned(); - let shutdown_clone = shutdown.clone(); - tokio::spawn(async move { - download_block_bundles(client, &base_url, cdn_start, cdn_end, pending_blocks_clone, shutdown_clone).await; - }); + + { + let stoppable = stoppable.clone(); + tokio::spawn(async move { + download_block_bundles(client, &base_url, cdn_start, cdn_end, pending_blocks_clone, stoppable).await; + }); + } // A loop for inserting the pending blocks into the ledger. let mut current_height = start_height.saturating_sub(1); while current_height < end_height - 1 { // If we are instructed to shut down, abort. - if shutdown.load(Ordering::Acquire) { + if stoppable.is_stopped() { info!("Stopping block sync at {} - shutting down", current_height); // We can shut down cleanly from here, as the node hasn't been started yet. std::process::exit(0); @@ -269,12 +274,12 @@ pub async fn load_blocks( // Attempt to advance the ledger using the CDN block bundle. let mut process_clone = process.clone(); - let shutdown_clone = shutdown.clone(); + let stoppable_clone = stoppable.clone(); current_height = tokio::task::spawn_blocking(move || { threadpool.install(|| { for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) { // If we are instructed to shut down, abort. - if shutdown_clone.load(Ordering::Relaxed) { + if stoppable_clone.is_stopped() { info!("Stopping block sync at {} - the node is shutting down", current_height); // We can shut down cleanly from here, as the node hasn't been started yet. std::process::exit(0); @@ -314,7 +319,7 @@ async fn download_block_bundles( cdn_start: u32, cdn_end: u32, pending_blocks: Arc>>>, - shutdown: Arc, + stoppable: Arc, ) { // Keep track of the number of concurrent requests. let active_requests: Arc = Default::default(); @@ -322,7 +327,7 @@ async fn download_block_bundles( let mut start = cdn_start; while start < cdn_end - 1 { // If we are instructed to shut down, stop downloading. - if shutdown.load(Ordering::Acquire) { + if stoppable.is_stopped() { break; } @@ -356,7 +361,7 @@ async fn download_block_bundles( let base_url_clone = base_url.clone(); let pending_blocks_clone = pending_blocks.clone(); let active_requests_clone = active_requests.clone(); - let shutdown_clone = shutdown.clone(); + let stoppable_clone = stoppable.clone(); tokio::spawn(async move { // Increment the number of active requests. active_requests_clone.fetch_add(1, Ordering::Relaxed); @@ -392,7 +397,7 @@ async fn download_block_bundles( attempts += 1; if attempts > MAXIMUM_REQUEST_ATTEMPTS { warn!("Maximum number of requests to {blocks_url} reached - shutting down..."); - shutdown_clone.store(true, Ordering::Relaxed); + stoppable_clone.stop(); break; } tokio::time::sleep(Duration::from_secs(attempts as u64 * 10)).await; @@ -553,8 +558,10 @@ fn log_progress( #[cfg(test)] mod tests { - use super::{BLOCKS_PER_FILE, CDN_BASE_URL, cdn_height, log_progress}; - use crate::load_blocks; + use super::{BLOCKS_PER_FILE, CDN_BASE_URL, cdn_height, load_blocks, log_progress}; + + use snarkos_utilities::SimpleStoppable; + use snarkvm::prelude::{MainnetV0, block::Block}; use http::Uri; @@ -576,7 +583,7 @@ mod tests { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let completed_height = - load_blocks(&testnet_cdn_url, start, end, Default::default(), process).await.unwrap(); + load_blocks(&testnet_cdn_url, start, end, SimpleStoppable::new(), process).await.unwrap(); assert_eq!(blocks.read().len(), expected); if expected > 0 { assert_eq!(blocks.read().last().unwrap().height(), completed_height); diff --git a/node/router/Cargo.toml b/node/router/Cargo.toml index 8dda700f96..a5e91536fc 100644 --- a/node/router/Cargo.toml +++ b/node/router/Cargo.toml @@ -18,7 +18,12 @@ edition = "2024" [features] test = [ ] -locktick = [ "dep:locktick", "snarkos-node-tcp/locktick", "snarkvm/locktick" ] +locktick = [ + "dep:locktick", + "snarkos-node-tcp/locktick", + "snarkvm/locktick", + "snarkos-node-network/locktick" +] metrics = [ "dep:snarkos-node-metrics" ] cuda = [ "snarkvm/cuda", "snarkos-account/cuda" ] serial = [ "snarkos-node-bft-ledger-service/serial" ] diff --git a/node/src/bootstrap_client/mod.rs b/node/src/bootstrap_client/mod.rs index e594475d32..c245548824 100644 --- a/node/src/bootstrap_client/mod.rs +++ b/node/src/bootstrap_client/mod.rs @@ -21,6 +21,7 @@ use crate::tcp::{self, Tcp}; use snarkos_account::Account; use snarkos_node_network::{ConnectionMode, Peer, Resolver}; use snarkos_node_tcp::{P2P, protocols::*}; +use snarkos_utilities::SignalHandler; use snarkvm::{ ledger::committee::Committee, prelude::{Address, Field, Header, Network, PrivateKey, ViewKey}, @@ -230,4 +231,21 @@ impl BootstrapClient { let _ = shutdown_tx.send(()); } } + + /// Blocks until a shutdown signal was received or manual shutdown was triggered. + pub async fn wait_for_signals(&self, handler: &SignalHandler) { + handler.wait_for_signals().await; + + warn!("=========================================================================================="); + warn!("⚠️ Attention - Starting the graceful shutdown procedure (ETA: 30 seconds)..."); + warn!("⚠️ Attention - To avoid DATA CORRUPTION, do NOT interrupt snarkOS (or press Ctrl+C again)"); + warn!("⚠️ Attention - Please wait until the shutdown gracefully completes (ETA: 30 seconds)"); + warn!("=========================================================================================="); + + // If the node is already initialized, then shut it down. + self.shut_down().await; + + // A best-effort attempt to let any ongoing activity conclude. + tokio::time::sleep(Duration::from_secs(3)).await; + } } diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index 80c87a8df2..2ce274da88 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -15,11 +15,13 @@ mod router; -use crate::traits::NodeInterface; +use crate::{ + bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking}, + cdn::CdnBlockSync, + traits::NodeInterface, +}; use snarkos_account::Account; -use snarkos_node_bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking}; -use snarkos_node_cdn::CdnBlockSync; use snarkos_node_network::NodeType; use snarkos_node_rest::Rest; use snarkos_node_router::{ @@ -35,6 +37,8 @@ use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading}, }; +use snarkos_utilities::{SignalHandler, Stoppable}; + use snarkvm::{ console::network::Network, ledger::{ @@ -62,7 +66,6 @@ use std::{ sync::{ Arc, atomic::{ - AtomicBool, AtomicUsize, Ordering::{Acquire, Relaxed}, }, @@ -70,7 +73,6 @@ use std::{ time::Duration, }; use tokio::{ - sync::oneshot, task::JoinHandle, time::{sleep, timeout}, }; @@ -124,10 +126,10 @@ pub struct Client> { num_verifying_executions: Arc, /// The spawned handles. handles: Arc>>>, - /// The shutdown signal. - shutdown: Arc, /// Keeps track of sending pings. ping: Arc>, + /// The signal handling logic. + signal_handler: Arc, } impl> Client { @@ -143,12 +145,8 @@ impl> Client { storage_mode: StorageMode, trusted_peers_only: bool, dev: Option, - shutdown: Arc, - shutdown_tx: Option>, + signal_handler: Arc, ) -> Result { - // Initialize the signal handler. - let signal_node = Self::handle_signals(shutdown.clone(), shutdown_tx); - // Initialize the ledger. let ledger = { let storage_mode = storage_mode.clone(); @@ -159,8 +157,7 @@ impl> Client { .with_context(|| "Failed to initialize the ledger")?; // Initialize the ledger service. - let ledger_service = Arc::new(CoreLedgerService::::new(ledger.clone(), shutdown.clone())); - + let ledger_service = Arc::new(CoreLedgerService::::new(ledger.clone(), signal_handler.clone())); // Initialize the node router. let router = Router::new( node_ip, @@ -198,13 +195,13 @@ impl> Client { num_verifying_deploys: Default::default(), num_verifying_executions: Default::default(), handles: Default::default(), - shutdown: shutdown.clone(), + signal_handler: signal_handler.clone(), }; // Perform sync with CDN (if enabled). let cdn_sync = cdn.map(|base_url| { trace!("CDN sync is enabled"); - Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown)) + Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)) }); // Initialize the REST server. @@ -236,8 +233,6 @@ impl> Client { node.initialize_execute_verification(); // Initialize the notification message loop. node.handles.lock().push(crate::start_notification_message_loop()); - // Pass the node to the signal handler. - let _ = signal_node.set(node.clone()); // Return the node. Ok(node) } @@ -269,11 +264,9 @@ impl> Client { // Start the block request generation loop (outgoing). let self_ = self.clone(); self.spawn(async move { - while !self_.shutdown.load(std::sync::atomic::Ordering::Acquire) { + while !self_.signal_handler.is_stopped() { // Perform the sync routine. self_.try_issuing_block_requests().await; - - // Rate limiting happens in [`Self::send_block_requests`] and no additional sleeps are needed here } info!("Stopped block request generation"); @@ -282,7 +275,7 @@ impl> Client { // Start the block response processing loop (incoming). let self_ = self.clone(); self.spawn(async move { - while !self_.shutdown.load(std::sync::atomic::Ordering::Acquire) { + while !self_.signal_handler.is_stopped() { // Wait until there is something to do or until the timeout. let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await; @@ -395,7 +388,7 @@ impl> Client { self.spawn(async move { loop { // If the Ctrl-C handler registered the signal, stop the node. - if node.shutdown.load(Acquire) { + if node.signal_handler.is_stopped() { info!("Shutting down solution verification"); break; } @@ -469,7 +462,7 @@ impl> Client { self.spawn(async move { loop { // If the Ctrl-C handler registered the signal, stop the node. - if node.shutdown.load(Acquire) { + if node.signal_handler.is_stopped() { info!("Shutting down deployment verification"); break; } @@ -537,7 +530,7 @@ impl> Client { self.spawn(async move { loop { // If the Ctrl-C handler registered the signal, stop the node. - if node.shutdown.load(Acquire) { + if node.signal_handler.is_stopped() { info!("Shutting down execution verification"); break; } @@ -615,7 +608,6 @@ impl> NodeInterface for Client { // Shut down the node. trace!("Shutting down the node..."); - self.shutdown.store(true, std::sync::atomic::Ordering::Release); // Abort the tasks. trace!("Shutting down the client..."); diff --git a/node/src/node.rs b/node/src/node.rs index 14e10f0f75..93ffeb6453 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -13,10 +13,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{BootstrapClient, Client, Prover, Validator, traits::NodeInterface}; +use crate::{ + BootstrapClient, + Client, + Prover, + Validator, + network::{NodeType, Peer, PeerPoolHandling}, + router::Outbound, + traits::NodeInterface, +}; + use snarkos_account::Account; -use snarkos_node_network::{NodeType, Peer, PeerPoolHandling}; -use snarkos_node_router::Outbound; +use snarkos_utilities::SignalHandler; + use snarkvm::prelude::{ Address, Header, @@ -30,16 +39,12 @@ use snarkvm::prelude::{ use aleo_std::StorageMode; use anyhow::Result; + #[cfg(feature = "locktick")] use locktick::parking_lot::RwLock; #[cfg(not(feature = "locktick"))] use parking_lot::RwLock; -use std::{ - collections::HashMap, - net::SocketAddr, - sync::{Arc, atomic::AtomicBool}, -}; -use tokio::sync::oneshot; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; #[derive(Clone)] pub enum Node { @@ -69,8 +74,7 @@ impl Node { trusted_peers_only: bool, dev_txs: bool, dev: Option, - shutdown: Arc, - shutdown_tx: Option>, + signal_handler: Arc, ) -> Result { Ok(Self::Validator(Arc::new( Validator::new( @@ -87,8 +91,7 @@ impl Node { trusted_peers_only, dev_txs, dev, - shutdown, - shutdown_tx, + signal_handler, ) .await?, ))) @@ -103,8 +106,7 @@ impl Node { storage_mode: StorageMode, trusted_peers_only: bool, dev: Option, - shutdown: Arc, - shutdown_tx: Option>, + signal_handler: Arc, ) -> Result { Ok(Self::Prover(Arc::new( Prover::new( @@ -115,8 +117,7 @@ impl Node { storage_mode, trusted_peers_only, dev, - shutdown, - shutdown_tx, + signal_handler, ) .await?, ))) @@ -134,8 +135,7 @@ impl Node { storage_mode: StorageMode, trusted_peers_only: bool, dev: Option, - shutdown: Arc, - shutdown_tx: Option>, + signal_handler: Arc, ) -> Result { Ok(Self::Client(Arc::new( Client::new( @@ -149,8 +149,7 @@ impl Node { storage_mode, trusted_peers_only, dev, - shutdown, - shutdown_tx, + signal_handler, ) .await?, ))) @@ -277,4 +276,14 @@ impl Node { Self::BootstrapClient(node) => node.shut_down().await, } } + + /// Waits until the node receives a signal. + pub async fn wait_for_signals(&self, signal_handler: &SignalHandler) { + match self { + Self::Validator(node) => node.wait_for_signals(signal_handler).await, + Self::Prover(node) => node.wait_for_signals(signal_handler).await, + Self::Client(node) => node.wait_for_signals(signal_handler).await, + Self::BootstrapClient(node) => node.wait_for_signals(signal_handler).await, + } + } } diff --git a/node/src/prover/mod.rs b/node/src/prover/mod.rs index 433734ef89..5cedb19aeb 100644 --- a/node/src/prover/mod.rs +++ b/node/src/prover/mod.rs @@ -15,10 +15,15 @@ mod router; -use crate::traits::NodeInterface; +use crate::{ + bft::ledger_service::ProverLedgerService, + sync::{BlockSync, Ping}, + traits::NodeInterface, +}; + use snarkos_account::Account; -use snarkos_node_bft::ledger_service::ProverLedgerService; use snarkos_node_network::{NodeType, PeerPoolHandling}; + use snarkos_node_router::{ Heartbeat, Inbound, @@ -27,11 +32,12 @@ use snarkos_node_router::{ Routing, messages::{Message, UnconfirmedSolution}, }; -use snarkos_node_sync::{BlockSync, Ping}; use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading}, }; +use snarkos_utilities::{SignalHandler, Stoppable}; + use snarkvm::{ ledger::narwhal::Data, prelude::{ @@ -57,10 +63,10 @@ use std::{ net::SocketAddr, sync::{ Arc, - atomic::{AtomicBool, AtomicU8, Ordering}, + atomic::{AtomicU8, Ordering}, }, }; -use tokio::{sync::oneshot, task::JoinHandle}; +use tokio::task::JoinHandle; /// A prover is a light node, capable of producing proofs for consensus. #[derive(Clone)] @@ -83,10 +89,10 @@ pub struct Prover> { max_puzzle_instances: u8, /// The spawned handles. handles: Arc>>>, - /// The shutdown signal. - shutdown: Arc, /// Keeps track of sending pings. ping: Arc>, + /// The signal handling logic. + signal_handler: Arc, /// PhantomData. _phantom: PhantomData, } @@ -101,12 +107,8 @@ impl> Prover { storage_mode: StorageMode, trusted_peers_only: bool, dev: Option, - shutdown: Arc, - shutdown_tx: Option>, + signal_handler: Arc, ) -> Result { - // Initialize the signal handler. - let signal_node = Self::handle_signals(shutdown.clone(), shutdown_tx); - // Initialize the ledger service. let ledger_service = Arc::new(ProverLedgerService::new()); @@ -144,7 +146,7 @@ impl> Prover { max_puzzle_instances: u8::try_from(max_puzzle_instances)?, handles: Default::default(), ping, - shutdown, + signal_handler, _phantom: Default::default(), }; // Initialize the routing. @@ -153,8 +155,7 @@ impl> Prover { node.initialize_puzzle().await; // Initialize the notification message loop. node.handles.lock().push(crate::start_notification_message_loop()); - // Pass the node to the signal handler. - let _ = signal_node.set(node.clone()); + // Return the node. Ok(node) } @@ -172,7 +173,6 @@ impl> NodeInterface for Prover { // Shut down the puzzle. debug!("Shutting down the puzzle..."); - self.shutdown.store(true, Ordering::Release); // Abort the tasks. debug!("Shutting down the prover..."); @@ -243,7 +243,7 @@ impl> Prover { } // If the Ctrl-C handler registered the signal, stop the prover. - if self.shutdown.load(Ordering::Acquire) { + if self.signal_handler.is_stopped() { debug!("Shutting down the puzzle..."); break; } diff --git a/node/src/traits.rs b/node/src/traits.rs index 735b37d0dd..b4a5c40de9 100644 --- a/node/src/traits.rs +++ b/node/src/traits.rs @@ -15,19 +15,12 @@ use snarkos_node_network::{NodeType, PeerPoolHandling}; use snarkos_node_router::Routing; + +use snarkos_utilities::SignalHandler; + use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey}; -use once_cell::sync::OnceCell; -use std::{ - future::Future, - io, - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, - time::Duration, -}; -use tokio::sync::oneshot; +use std::time::Duration; #[async_trait] pub trait NodeInterface: Routing { @@ -56,69 +49,21 @@ pub trait NodeInterface: Routing { self.router().is_dev() } - /// Handles OS signals for the node to intercept and perform a clean shutdown. - /// The optional `shutdown_flag` flag can be used to cleanly terminate the syncing process. - fn handle_signals(shutdown_flag: Arc, shutdown_tx: Option>) -> Arc> { - // In order for the signal handler to be started as early as possible, a reference to the node needs - // to be passed to it at a later time. - let node: Arc> = Default::default(); - - #[cfg(target_family = "unix")] - fn signal_listener() -> impl Future> { - use tokio::signal::unix::{SignalKind, signal}; - - // Handle SIGINT, SIGTERM, SIGQUIT, and SIGHUP. - let mut s_int = signal(SignalKind::interrupt()).unwrap(); - let mut s_term = signal(SignalKind::terminate()).unwrap(); - let mut s_quit = signal(SignalKind::quit()).unwrap(); - let mut s_hup = signal(SignalKind::hangup()).unwrap(); - - // Return when any of the signals above is received. - async move { - tokio::select!( - _ = s_int.recv() => (), - _ = s_term.recv() => (), - _ = s_quit.recv() => (), - _ = s_hup.recv() => (), - ); - Ok(()) - } - } - #[cfg(not(target_family = "unix"))] - fn signal_listener() -> impl Future> { - tokio::signal::ctrl_c() - } - - let node_clone = node.clone(); - tokio::task::spawn(async move { - match signal_listener().await { - Ok(()) => { - warn!("=========================================================================================="); - warn!("⚠️ Attention - Starting the graceful shutdown procedure (ETA: 30 seconds)..."); - warn!("⚠️ Attention - To avoid DATA CORRUPTION, do NOT interrupt snarkOS (or press Ctrl+C again)"); - warn!("⚠️ Attention - Please wait until the shutdown gracefully completes (ETA: 30 seconds)"); - warn!("=========================================================================================="); - - match node_clone.get() { - // If the node is already initialized, then shut it down. - Some(node) => node.shut_down().await, - // Otherwise, if the node is not yet initialized, then set the shutdown flag directly. - None => shutdown_flag.store(true, Ordering::Relaxed), - } - - // A best-effort attempt to let any ongoing activity conclude. - tokio::time::sleep(Duration::from_secs(3)).await; - - // Terminate the process. - if let Some(tx) = shutdown_tx { - let _ = tx.send(()); - } - } - Err(error) => error!("tokio::signal::ctrl_c encountered an error: {}", error), - } - }); - - node + /// Blocks until a shutdown signal was received or manual shutdown was triggered. + async fn wait_for_signals(&self, handler: &SignalHandler) { + handler.wait_for_signals().await; + + warn!("=========================================================================================="); + warn!("⚠️ Attention - Starting the graceful shutdown procedure (ETA: 30 seconds)..."); + warn!("⚠️ Attention - To avoid DATA CORRUPTION, do NOT interrupt snarkOS (or press Ctrl+C again)"); + warn!("⚠️ Attention - Please wait until the shutdown gracefully completes (ETA: 30 seconds)"); + warn!("=========================================================================================="); + + // If the node is already initialized, then shut it down. + self.shut_down().await; + + // A best-effort attempt to let any ongoing activity conclude. + tokio::time::sleep(Duration::from_secs(3)).await; } /// Shuts down the node. diff --git a/node/src/validator/mod.rs b/node/src/validator/mod.rs index 7b8c3e5304..c3d75d6e9e 100644 --- a/node/src/validator/mod.rs +++ b/node/src/validator/mod.rs @@ -36,6 +36,8 @@ use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect, Reading}, }; +use snarkos_utilities::SignalHandler; + use snarkvm::prelude::{ Ledger, Network, @@ -51,12 +53,8 @@ use core::future::Future; use locktick::parking_lot::Mutex; #[cfg(not(feature = "locktick"))] use parking_lot::Mutex; -use std::{ - net::SocketAddr, - sync::{Arc, atomic::AtomicBool}, - time::Duration, -}; -use tokio::{sync::oneshot, task::JoinHandle}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; +use tokio::task::JoinHandle; /// A validator is a full node, capable of validating blocks. #[derive(Clone)] @@ -73,8 +71,6 @@ pub struct Validator> { sync: Arc>, /// The spawned handles. handles: Arc>>>, - /// The shutdown signal. - shutdown: Arc, /// Keeps track of sending pings. ping: Arc>, } @@ -95,12 +91,8 @@ impl> Validator { trusted_peers_only: bool, dev_txs: bool, dev: Option, - shutdown: Arc, - shutdown_tx: Option>, + signal_handler: Arc, ) -> Result { - // Initialize the signal handler. - let signal_node = Self::handle_signals(shutdown.clone(), shutdown_tx); - // Initialize the ledger. let ledger = { let storage_mode = storage_mode.clone(); @@ -111,7 +103,7 @@ impl> Validator { .with_context(|| "Failed to initialize the ledger")?; // Initialize the ledger service. - let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone())); + let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), signal_handler.clone())); // Initialize the node router. let router = Router::new( @@ -155,11 +147,10 @@ impl> Validator { sync: sync.clone(), ping, handles: Default::default(), - shutdown: shutdown.clone(), }; // Perform sync with CDN (if enabled). - let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown))); + let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))); // Initialize the transaction pool. node.initialize_transaction_pool(dev, dev_txs)?; @@ -193,8 +184,7 @@ impl> Validator { node.initialize_routing().await; // Initialize the notification message loop. node.handles.lock().push(crate::start_notification_message_loop()); - // Pass the node to the signal handler. - let _ = signal_node.set(node.clone()); + // Return the node. Ok(node) } @@ -468,7 +458,6 @@ impl> NodeInterface for Validator { // Shut down the node. trace!("Shutting down the node..."); - self.shutdown.store(true, std::sync::atomic::Ordering::Release); // Abort the tasks. trace!("Shutting down the validator..."); @@ -538,8 +527,7 @@ mod tests { false, dev_txs, None, - Default::default(), - None, + SignalHandler::new(), ) .await .unwrap(); diff --git a/node/tests/common/node.rs b/node/tests/common/node.rs index 1021339649..2e55778e88 100644 --- a/node/tests/common/node.rs +++ b/node/tests/common/node.rs @@ -14,8 +14,11 @@ // limitations under the License. use crate::common::test_peer::sample_genesis_block; + use snarkos_account::Account; use snarkos_node::{Client, Prover, Validator}; +use snarkos_utilities::SignalHandler; + use snarkvm::prelude::{MainnetV0 as CurrentNetwork, store::helpers::memory::ConsensusMemory}; use aleo_std::StorageMode; @@ -33,8 +36,7 @@ pub async fn client() -> Client> StorageMode::new_test(None), false, // Connect to untrusted peers. None, - Default::default(), - None, + SignalHandler::new(), ) .await .expect("couldn't create client instance") @@ -49,8 +51,7 @@ pub async fn prover() -> Prover> StorageMode::new_test(None), false, None, - Default::default(), - None, + SignalHandler::new(), ) .await .expect("couldn't create prover instance") @@ -71,8 +72,7 @@ pub async fn validator() -> Validator" ] +description = "Utilities for a decentralized operating system" +homepage = "https://aleo.org" +repository = "https://github.com/ProvableHQ/snarkOS" +keywords = [ + "aleo", + "cryptography", + "blockchain", + "decentralized", + "zero-knowledge" +] +include = ["../LICENSE.md"] +categories = [ "cryptography", "cryptography::cryptocurrencies", "os" ] +license = "Apache-2.0" +edition = "2024" + +[dependencies.tokio] +workspace = true +features = [ "macros", "signal" ] + +[dependencies.tracing] +workspace = true diff --git a/utilities/src/lib.rs b/utilities/src/lib.rs new file mode 100644 index 0000000000..de7548d27a --- /dev/null +++ b/utilities/src/lib.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2019-2025 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Utilities for signal and shutdown handling. +pub mod signals; + +pub use signals::*; diff --git a/utilities/src/signals.rs b/utilities/src/signals.rs new file mode 100644 index 0000000000..99f1006a8c --- /dev/null +++ b/utilities/src/signals.rs @@ -0,0 +1,140 @@ +// Copyright (c) 2019-2025 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; +use tokio::sync::Notify; + +use tracing::{debug, error}; + +/// Generic trait that can be queried for whether current process should be stopped. +/// This is implemented by `SignalHandler` and `SimpleStoppable`. +pub trait Stoppable: Send + Sync { + /// Initiates shutdown of the node. + fn stop(&self); + + /// Returns `true` if the node is (in the process of being) stopped. + fn is_stopped(&self) -> bool; +} + +/// Wrapper around `AtomicBool` that implements the `Stoppable` trait. +/// +/// This is useful when no signal or complex shutdown handling is necessary (e.g., in a test environment). +pub struct SimpleStoppable { + state: AtomicBool, +} + +impl SimpleStoppable { + pub fn new() -> Arc { + Arc::new(Self { state: AtomicBool::new(false) }) + } +} + +impl Stoppable for SimpleStoppable { + fn stop(&self) { + self.state.store(true, Ordering::SeqCst); + } + + fn is_stopped(&self) -> bool { + self.state.load(Ordering::SeqCst) + } +} + +/// Helper for signal handling that implements the `Stoppable` trait. +/// +/// This struct will set itself to "stopped" as soon as the process receives Ctrl+C. +/// It can also be manually stopped (e.g., when the node encounters a fatal error). +pub struct SignalHandler { + stopped: AtomicBool, + notify: Notify, +} + +impl SignalHandler { + /// Spawns a background tasks that listens for Ctrl+C and returns `Self`. + pub fn new() -> Arc { + let obj = Arc::new(Self { stopped: AtomicBool::new(false), notify: Default::default() }); + + { + let obj = obj.clone(); + tokio::spawn(async move { + obj.handle_signals().await; + }); + } + + obj + } + + /// Logic for the background task that waits for a signal. + async fn handle_signals(&self) { + #[cfg(target_family = "unix")] + let signal_listener = async move { + use tokio::signal::unix::{SignalKind, signal}; + + // Handle SIGINT, SIGTERM, SIGQUIT, and SIGHUP. + let mut s_int = signal(SignalKind::interrupt())?; + let mut s_term = signal(SignalKind::terminate())?; + let mut s_quit = signal(SignalKind::quit())?; + let mut s_hup = signal(SignalKind::hangup())?; + + tokio::select!( + _ = s_int.recv() => debug!("Received SIGINT"), + _ = s_term.recv() => debug!("Received SIGTERM"), + _ = s_quit.recv() => debug!("Received SIGQUIT"), + _ = s_hup.recv() => debug!("Received SIGHUP"), + ); + + std::io::Result::<()>::Ok(()) + }; + + #[cfg(not(target_family = "unix"))] + let signal_listener = async move { + tokio::signal::ctrl_c().await?; + debug!("Received signal"); + + std::io::Result::<()>::Ok(()) + }; + + // Block until the signal. + match signal_listener.await { + Ok(()) => {} + Err(error) => { + error!("tokio::signal encountered an error: {error}"); + } + } + + self.stop(); + } + + /// Blocks until the signal handler was invoked or the stopped flag was set some other way. + /// Note: This can only be called once, and must not be called concurrently. + pub async fn wait_for_signals(&self) { + while !self.is_stopped() { + self.notify.notified().await + } + } +} + +impl Stoppable for SignalHandler { + fn stop(&self) { + self.stopped.store(true, Ordering::SeqCst); + self.notify.notify_one(); + } + + fn is_stopped(&self) -> bool { + self.stopped.load(Ordering::SeqCst) + } +} From 320b8a6c5bde6b8cc70357aaadaca7ce77a1484c Mon Sep 17 00:00:00 2001 From: Kai Mast Date: Mon, 24 Nov 2025 15:08:27 -0800 Subject: [PATCH 2/6] misc(node/utils): apply reviewer comments for shutdown handling --- Cargo.lock | 2 ++ node/bft/src/sync/mod.rs | 2 +- node/bft/tests/gateway_e2e.rs | 3 ++- node/src/bootstrap_client/mod.rs | 9 ------- node/src/traits.rs | 11 --------- utilities/Cargo.toml | 9 ++++++- utilities/src/signals.rs | 41 ++++++++++++++++++++++++-------- 7 files changed, 44 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fcb7a1ca0d..4131f9b586 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4532,6 +4532,8 @@ dependencies = [ name = "snarkos-utilities" version = "4.2.1" dependencies = [ + "locktick", + "parking_lot", "tokio", "tracing", ] diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 9cf74256eb..a8c0971054 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -1264,7 +1264,7 @@ mod tests { None, &[], false, - StorageMode::Test(None), + StorageMode::new_test(None), None, )?; // Initialize the block synchronization logic. diff --git a/node/bft/tests/gateway_e2e.rs b/node/bft/tests/gateway_e2e.rs index 909341d863..32ba053bad 100644 --- a/node/bft/tests/gateway_e2e.rs +++ b/node/bft/tests/gateway_e2e.rs @@ -33,6 +33,7 @@ use std::time::Duration; use deadline::deadline; use rand::Rng; +use tokio::task; async fn new_test_gateway( num_nodes: u16, @@ -41,7 +42,7 @@ async fn new_test_gateway( let (accounts, committee) = new_test_committee(num_nodes, rng); let accounts_ = accounts.clone(); let mut rng_ = TestRng::fixed(rng.r#gen()); - let ledger = sample_ledger(&accounts_, &committee, &mut rng_); + let ledger = task::spawn_blocking(move || sample_ledger(&accounts_, &committee, &mut rng_)).await.unwrap(); let storage = sample_storage(ledger.clone()); let gateway = sample_gateway(accounts[0].clone(), storage, ledger); diff --git a/node/src/bootstrap_client/mod.rs b/node/src/bootstrap_client/mod.rs index c245548824..cca28578f4 100644 --- a/node/src/bootstrap_client/mod.rs +++ b/node/src/bootstrap_client/mod.rs @@ -236,16 +236,7 @@ impl BootstrapClient { pub async fn wait_for_signals(&self, handler: &SignalHandler) { handler.wait_for_signals().await; - warn!("=========================================================================================="); - warn!("⚠️ Attention - Starting the graceful shutdown procedure (ETA: 30 seconds)..."); - warn!("⚠️ Attention - To avoid DATA CORRUPTION, do NOT interrupt snarkOS (or press Ctrl+C again)"); - warn!("⚠️ Attention - Please wait until the shutdown gracefully completes (ETA: 30 seconds)"); - warn!("=========================================================================================="); - // If the node is already initialized, then shut it down. self.shut_down().await; - - // A best-effort attempt to let any ongoing activity conclude. - tokio::time::sleep(Duration::from_secs(3)).await; } } diff --git a/node/src/traits.rs b/node/src/traits.rs index b4a5c40de9..dd7a4155d4 100644 --- a/node/src/traits.rs +++ b/node/src/traits.rs @@ -20,8 +20,6 @@ use snarkos_utilities::SignalHandler; use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey}; -use std::time::Duration; - #[async_trait] pub trait NodeInterface: Routing { /// Returns the node type. @@ -53,17 +51,8 @@ pub trait NodeInterface: Routing { async fn wait_for_signals(&self, handler: &SignalHandler) { handler.wait_for_signals().await; - warn!("=========================================================================================="); - warn!("⚠️ Attention - Starting the graceful shutdown procedure (ETA: 30 seconds)..."); - warn!("⚠️ Attention - To avoid DATA CORRUPTION, do NOT interrupt snarkOS (or press Ctrl+C again)"); - warn!("⚠️ Attention - Please wait until the shutdown gracefully completes (ETA: 30 seconds)"); - warn!("=========================================================================================="); - // If the node is already initialized, then shut it down. self.shut_down().await; - - // A best-effort attempt to let any ongoing activity conclude. - tokio::time::sleep(Duration::from_secs(3)).await; } /// Shuts down the node. diff --git a/utilities/Cargo.toml b/utilities/Cargo.toml index 85da7b08a1..c03dc9317f 100644 --- a/utilities/Cargo.toml +++ b/utilities/Cargo.toml @@ -19,7 +19,14 @@ edition = "2024" [dependencies.tokio] workspace = true -features = [ "macros", "signal" ] +features = [ "macros", "signal", "sync" ] [dependencies.tracing] workspace = true + +[dependencies.locktick] +optional = true +workspace = true + +[dependencies.parking_lot] +workspace = true diff --git a/utilities/src/signals.rs b/utilities/src/signals.rs index 99f1006a8c..885f6cc54d 100644 --- a/utilities/src/signals.rs +++ b/utilities/src/signals.rs @@ -13,11 +13,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "locktick")] +use locktick::parking_lot::RwLock; +#[cfg(not(feature = "locktick"))] +use parking_lot::RwLock; + use std::sync::{ Arc, atomic::{AtomicBool, Ordering}, }; -use tokio::sync::Notify; +use tokio::sync::oneshot; use tracing::{debug, error}; @@ -59,14 +64,22 @@ impl Stoppable for SimpleStoppable { /// This struct will set itself to "stopped" as soon as the process receives Ctrl+C. /// It can also be manually stopped (e.g., when the node encounters a fatal error). pub struct SignalHandler { - stopped: AtomicBool, - notify: Notify, + /// This sender is used to notify a waiting task that the node has been stopped. + /// If this is `None`, the node is in the process of shutting down. + stopped_sender: RwLock>>, + + /// This receiver is used to wait for the node to be stopped. + stopped_receiver: RwLock>>, } impl SignalHandler { /// Spawns a background tasks that listens for Ctrl+C and returns `Self`. pub fn new() -> Arc { - let obj = Arc::new(Self { stopped: AtomicBool::new(false), notify: Default::default() }); + let (stopped_sender, stopped_receiver) = oneshot::channel(); + let obj = Arc::new(Self { + stopped_sender: RwLock::new(Some(stopped_sender)), + stopped_receiver: RwLock::new(Some(stopped_receiver)), + }); { let obj = obj.clone(); @@ -119,22 +132,30 @@ impl SignalHandler { self.stop(); } - /// Blocks until the signal handler was invoked or the stopped flag was set some other way. + /// Waits until the signal handler was invoked or the stopped flag was set some other way. + /// /// Note: This can only be called once, and must not be called concurrently. pub async fn wait_for_signals(&self) { - while !self.is_stopped() { - self.notify.notified().await + let receiver = self.stopped_receiver.write().take(); + + if let Some(receiver) = receiver { + if let Err(err) = receiver.await { + error!("wait_for_signals encountered an error: {err}"); + } + } else { + panic!("wait_for_signals must be called at most once"); } } } impl Stoppable for SignalHandler { fn stop(&self) { - self.stopped.store(true, Ordering::SeqCst); - self.notify.notify_one(); + if let Some(stopped_sender) = self.stopped_sender.write().take() { + let _ = stopped_sender.send(()); + } } fn is_stopped(&self) -> bool { - self.stopped.load(Ordering::SeqCst) + self.stopped_sender.read().is_none() } } From 543db61b4f73fafd0fa83eb7381064dbd4a2b98f Mon Sep 17 00:00:00 2001 From: Kai Mast Date: Mon, 1 Dec 2025 14:43:12 -0800 Subject: [PATCH 3/6] misc(node/cdn): use SignalHandler directly instead of Stoppable --- node/cdn/src/blocks.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/cdn/src/blocks.rs b/node/cdn/src/blocks.rs index ab9a13db10..b984ac7445 100644 --- a/node/cdn/src/blocks.rs +++ b/node/cdn/src/blocks.rs @@ -17,7 +17,7 @@ // https://github.com/rust-lang/rust-clippy/issues/6446 #![allow(clippy::await_holding_lock)] -use snarkos_utilities::Stoppable; +use snarkos_utilities::{SignalHandler, Stoppable}; use snarkvm::prelude::{ Deserialize, @@ -89,7 +89,7 @@ impl CdnBlockSync { pub fn new>( base_url: http::Uri, ledger: Ledger, - stoppable: Arc, + stoppable: Arc, ) -> Self { let task = { let base_url = base_url.clone(); From aa6ddfc570cf52a4e5459603837ebddb300af89b Mon Sep 17 00:00:00 2001 From: Kai Mast Date: Mon, 1 Dec 2025 15:01:03 -0800 Subject: [PATCH 4/6] build(node): add locktick feature where needed --- cli/Cargo.toml | 11 +++++++++-- display/Cargo.toml | 5 +++++ node/Cargo.toml | 9 ++++++--- node/bft/Cargo.toml | 1 + node/bft/ledger-service/Cargo.toml | 10 ++++++++-- node/cdn/Cargo.toml | 15 +++++++++++---- 6 files changed, 40 insertions(+), 11 deletions(-) diff --git a/cli/Cargo.toml b/cli/Cargo.toml index f7b9c6e36a..22c54809a3 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -17,12 +17,15 @@ license = "Apache-2.0" edition = "2024" [features] -async = [] +default = [ ] +async = [ ] locktick = [ "dep:locktick", + "snarkos-display/locktick", "snarkos-node/locktick", "snarkos-node-cdn/locktick", "snarkos-node-rest/locktick", + "snarkos-utilities/locktick", "snarkvm/locktick" ] metrics = [ "dep:snarkos-node-metrics", "snarkos-node/metrics" ] @@ -40,7 +43,11 @@ test_network = [ "test_consensus_heights", "snarkvm/dev-print" ] -serial = [ ] +serial = [ + "snarkvm/serial", + "snarkos-node/serial", + "snarkos-display/serial", +] tokio_console = [ "dep:console-subscriber", "tokio/tracing" ] [dependencies.aleo-std] diff --git a/display/Cargo.toml b/display/Cargo.toml index 4b5f034cff..ed9e7081f7 100644 --- a/display/Cargo.toml +++ b/display/Cargo.toml @@ -16,6 +16,11 @@ categories = [ "cryptography", "cryptography::cryptocurrencies", "os" ] license = "Apache-2.0" edition = "2024" +[features] +default = [ ] +locktick = [ "snarkos-node/locktick", "snarkos-utilities/locktick" ] +serial = [ "snarkvm/serial" ] + [dependencies.anyhow] workspace = true diff --git a/node/Cargo.toml b/node/Cargo.toml index 9f9b4b28a0..063755359c 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -17,8 +17,7 @@ license = "Apache-2.0" edition = "2024" [features] -default = [ "parallel" ] -parallel = [ ] #TODO(kaimast): is this still used? +default = [ ] timer = [ "aleo-std/timer" ] locktick = [ "dep:locktick", @@ -30,6 +29,7 @@ locktick = [ "snarkos-node-router/locktick", "snarkos-node-sync/locktick", "snarkos-node-tcp/locktick", + "snarkos-utilities/locktick", "snarkvm/locktick" ] metrics = [ @@ -50,7 +50,10 @@ cuda = [ "snarkos-node-router/cuda", "snarkos-node-sync/cuda" ] -serial = [ "snarkos-node-bft/serial" ] +serial = [ + "snarkvm/serial", + "snarkos-node-bft/serial" +] test = [] [dependencies.aleo-std] diff --git a/node/bft/Cargo.toml b/node/bft/Cargo.toml index f3dfef71c4..27aed9cb86 100644 --- a/node/bft/Cargo.toml +++ b/node/bft/Cargo.toml @@ -25,6 +25,7 @@ locktick = [ "snarkos-node-bft-storage-service/locktick", "snarkos-node-sync/locktick", "snarkos-node-tcp/locktick", + "snarkos-utilities/locktick", "snarkvm/locktick" ] metrics = [ diff --git a/node/bft/ledger-service/Cargo.toml b/node/bft/ledger-service/Cargo.toml index e77a913704..aa3febe6f4 100644 --- a/node/bft/ledger-service/Cargo.toml +++ b/node/bft/ledger-service/Cargo.toml @@ -20,12 +20,18 @@ edition = "2024" default = [ ] ledger = [ "parking_lot", "rand", "rayon", "tokio", "tracing" ] ledger-write = [ ] -locktick = [ "dep:locktick", "snarkvm/locktick" ] +locktick = [ + "dep:locktick", + "snarkvm/locktick", + "snarkos-utilities/locktick" +] metrics = [ "dep:snarkos-node-metrics", "snarkvm/metrics" ] mock = [ "parking_lot", "tracing" ] prover = [ ] cuda = [ "snarkvm/cuda" ] -serial = [ ] +serial = [ + "snarkvm/serial" +] test = [ "mock", "translucent" ] translucent = [ "ledger" ] diff --git a/node/cdn/Cargo.toml b/node/cdn/Cargo.toml index a7274378fe..35ac5c8ff8 100644 --- a/node/cdn/Cargo.toml +++ b/node/cdn/Cargo.toml @@ -17,9 +17,17 @@ license = "Apache-2.0" edition = "2024" [features] -default = [ "parallel" ] -locktick = [ "dep:locktick", "snarkvm/locktick" ] -parallel = [ "rayon" ] +default = [ ] +serial = [ + "snarkos-node-metrics/locktick", + "snarkvm/serial", +] +locktick = [ + "dep:locktick", + "snarkvm/locktick", + "snarkos-node-metrics/locktick", + "snarkos-utilities/locktick" +] cuda = [ "snarkvm/cuda" ] metrics = [ "dep:snarkos-node-metrics" ] @@ -50,7 +58,6 @@ workspace = true [dependencies.rayon] workspace = true -optional = true [dependencies.reqwest] version = "0.12" From 4cd95b4fb2f4f934a083915cbbdafef8631926f2 Mon Sep 17 00:00:00 2001 From: Kai Mast Date: Mon, 1 Dec 2025 17:26:20 -0800 Subject: [PATCH 5/6] test(node/bft): fix test failing due to blocking tasks --- node/bft/Cargo.toml | 5 ++++- node/bft/tests/components/worker.rs | 10 ++++++++-- node/consensus/Cargo.toml | 7 +++++-- node/metrics/Cargo.toml | 10 ++++++---- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/node/bft/Cargo.toml b/node/bft/Cargo.toml index 27aed9cb86..3ae7b8d5f9 100644 --- a/node/bft/Cargo.toml +++ b/node/bft/Cargo.toml @@ -45,7 +45,10 @@ test = [ "snarkos-node-bft-ledger-service/test", "snarkos-node-bft-storage-service/test" ] -serial = [ "snarkos-node-bft-ledger-service/serial" ] +serial = [ + "snarkos-node-metrics/serial", + "snarkos-node-bft-ledger-service/serial" +] [dependencies.aleo-std] workspace = true diff --git a/node/bft/tests/components/worker.rs b/node/bft/tests/components/worker.rs index be53686ae0..aca874767d 100644 --- a/node/bft/tests/components/worker.rs +++ b/node/bft/tests/components/worker.rs @@ -36,7 +36,10 @@ async fn test_resend_transmission_request() { // Initialize the accounts and the committee. let (accounts, committee) = new_test_committee(num_nodes, &mut rng); // Sample a ledger. - let ledger = sample_ledger(&accounts, &committee, &mut rng); + let ledger = { + let accounts = accounts.clone(); + tokio::task::spawn_blocking(move || sample_ledger(&accounts, &committee, &mut rng)).await.unwrap() + }; // Sample a worker. let worker = sample_worker(0, accounts[0].clone(), ledger.clone()); @@ -118,7 +121,10 @@ async fn test_flood_transmission_requests() { // Initialize the accounts and the committee. let (accounts, committee) = new_test_committee(num_nodes, &mut rng); // Sample a ledger. - let ledger = sample_ledger(&accounts, &committee, &mut rng); + let ledger = { + let accounts = accounts.clone(); + tokio::task::spawn_blocking(move || sample_ledger(&accounts, &committee, &mut rng)).await.unwrap() + }; // Sample a worker. let worker = sample_worker(0, accounts[0].clone(), ledger.clone()); diff --git a/node/consensus/Cargo.toml b/node/consensus/Cargo.toml index 49753927f8..7d6b75a66a 100644 --- a/node/consensus/Cargo.toml +++ b/node/consensus/Cargo.toml @@ -28,8 +28,11 @@ locktick = [ metrics = [ "dep:snarkos-node-metrics" ] telemetry = [ "snarkos-node-bft/telemetry" ] cuda = [ "snarkvm/cuda", "snarkos-account/cuda", "snarkos-node-bft-ledger-service/cuda" ] -serial = [ "snarkos-node-bft-ledger-service/serial" ] - +serial = [ + "snarkos-node-bft-ledger-service/serial", + "snarkos-node-metrics/serial", + "snarkvm/serial" +] [dependencies.aleo-std] workspace = true diff --git a/node/metrics/Cargo.toml b/node/metrics/Cargo.toml index 1c72cf774b..3f89b6a615 100644 --- a/node/metrics/Cargo.toml +++ b/node/metrics/Cargo.toml @@ -17,10 +17,13 @@ license = "Apache-2.0" edition = "2024" [features] -default = [ "rayon", "snarkvm/metrics" ] -locktick = [ "dep:locktick", "snarkvm/locktick" ] +default = [ "metrics" ] +locktick = [ + "dep:locktick", + "snarkvm/locktick" +] metrics = [ "snarkvm/metrics" ] -serial = [ ] +serial = [ "snarkvm/serial" ] [dependencies.locktick] workspace = true @@ -35,7 +38,6 @@ workspace = true [dependencies.rayon] workspace = true -optional = true [dependencies.snarkvm] workspace = true From 6ad7d7105dce287a891173c1ba131e2d213b43ff Mon Sep 17 00:00:00 2001 From: Kai Mast Date: Tue, 2 Dec 2025 10:10:42 -0800 Subject: [PATCH 6/6] misc(cli): apply more reviewer feedback for shutdown handling --- cli/src/commands/start.rs | 22 +++++++-------------- utilities/src/signals.rs | 40 +++++++++++++++++---------------------- 2 files changed, 24 insertions(+), 38 deletions(-) diff --git a/cli/src/commands/start.rs b/cli/src/commands/start.rs index 32d463e599..c35a9bcd70 100644 --- a/cli/src/commands/start.rs +++ b/cli/src/commands/start.rs @@ -288,26 +288,15 @@ impl Start { Self::runtime().block_on(async move { // Error messages. let node_parse_error = || "Failed to start node"; - let signal_handler = SignalHandler::new(); // Clone the configurations. let mut self_ = self.clone(); // Parse the node arguments, start it, and block until shutdown. match self_.network { - MainnetV0::ID => self_ - .parse_node::(log_receiver, signal_handler.clone()) - .await - .with_context(node_parse_error)?, - - TestnetV0::ID => self_ - .parse_node::(log_receiver, signal_handler.clone()) - .await - .with_context(node_parse_error)?, - CanaryV0::ID => self_ - .parse_node::(log_receiver, signal_handler.clone()) - .await - .with_context(node_parse_error)?, + MainnetV0::ID => self_.parse_node::(log_receiver).await.with_context(node_parse_error)?, + TestnetV0::ID => self_.parse_node::(log_receiver).await.with_context(node_parse_error)?, + CanaryV0::ID => self_.parse_node::(log_receiver).await.with_context(node_parse_error)?, _ => panic!("Invalid network ID specified"), }; @@ -577,7 +566,7 @@ impl Start { /// Start the node and blocks until it terminates. #[rustfmt::skip] - async fn parse_node(&mut self, log_receiver: mpsc::Receiver>, signal_handler: Arc) -> Result<()> { + async fn parse_node(&mut self, log_receiver: mpsc::Receiver>) -> Result<()> { if !self.nobanner { // Print the welcome banner. println!("{}", crate::helpers::welcome_message()); @@ -722,6 +711,9 @@ impl Start { println!("🪧 The terminal UI will not start until the node has finished syncing from the CDN. If this step takes too long, consider restarting with `--nodisplay`."); } + // Register the signal handler. + let signal_handler = SignalHandler::new(); + // Initialize the node. let node = match node_type { NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, self.trusted_peers_only, dev_txs, self.dev, signal_handler.clone()).await, diff --git a/utilities/src/signals.rs b/utilities/src/signals.rs index 885f6cc54d..83809d1b2a 100644 --- a/utilities/src/signals.rs +++ b/utilities/src/signals.rs @@ -14,9 +14,9 @@ // limitations under the License. #[cfg(feature = "locktick")] -use locktick::parking_lot::RwLock; +use locktick::parking_lot::{Mutex, RwLock}; #[cfg(not(feature = "locktick"))] -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use std::sync::{ Arc, @@ -24,7 +24,7 @@ use std::sync::{ }; use tokio::sync::oneshot; -use tracing::{debug, error}; +use tracing::{debug, error, trace}; /// Generic trait that can be queried for whether current process should be stopped. /// This is implemented by `SignalHandler` and `SimpleStoppable`. @@ -69,7 +69,7 @@ pub struct SignalHandler { stopped_sender: RwLock>>, /// This receiver is used to wait for the node to be stopped. - stopped_receiver: RwLock>>, + stopped_receiver: Mutex>>, } impl SignalHandler { @@ -78,7 +78,7 @@ impl SignalHandler { let (stopped_sender, stopped_receiver) = oneshot::channel(); let obj = Arc::new(Self { stopped_sender: RwLock::new(Some(stopped_sender)), - stopped_receiver: RwLock::new(Some(stopped_receiver)), + stopped_receiver: Mutex::new(Some(stopped_receiver)), }); { @@ -104,10 +104,10 @@ impl SignalHandler { let mut s_hup = signal(SignalKind::hangup())?; tokio::select!( - _ = s_int.recv() => debug!("Received SIGINT"), - _ = s_term.recv() => debug!("Received SIGTERM"), - _ = s_quit.recv() => debug!("Received SIGQUIT"), - _ = s_hup.recv() => debug!("Received SIGHUP"), + _ = s_int.recv() => trace!("Received SIGINT"), + _ = s_term.recv() => trace!("Received SIGTERM"), + _ = s_quit.recv() => trace!("Received SIGQUIT"), + _ = s_hup.recv() => trace!("Received SIGHUP"), ); std::io::Result::<()>::Ok(()) @@ -116,17 +116,13 @@ impl SignalHandler { #[cfg(not(target_family = "unix"))] let signal_listener = async move { tokio::signal::ctrl_c().await?; - debug!("Received signal"); - std::io::Result::<()>::Ok(()) }; - // Block until the signal. + // Block until we receive a signal. match signal_listener.await { - Ok(()) => {} - Err(error) => { - error!("tokio::signal encountered an error: {error}"); - } + Ok(()) => debug!("Received signal, shutting down..."), + Err(error) => error!("tokio::signal encountered an error: {error}"), } self.stop(); @@ -136,14 +132,12 @@ impl SignalHandler { /// /// Note: This can only be called once, and must not be called concurrently. pub async fn wait_for_signals(&self) { - let receiver = self.stopped_receiver.write().take(); - - if let Some(receiver) = receiver { - if let Err(err) = receiver.await { - error!("wait_for_signals encountered an error: {err}"); - } - } else { + let Some(receiver) = self.stopped_receiver.lock().take() else { panic!("wait_for_signals must be called at most once"); + }; + + if let Err(err) = receiver.await { + error!("wait_for_signals encountered an error: {err}"); } } }