Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 112 additions & 54 deletions finalizer/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use commonware_storage::translator::EightCap;
use commonware_utils::acknowledgement::{Acknowledgement, Exact};
use commonware_utils::{NZU64, NZUsize, hex};
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, StreamExt as _, select};
use futures::{FutureExt, StreamExt as _, select_biased};
#[cfg(feature = "prom")]
use metrics::{counter, histogram};
#[cfg(debug_assertions)]
Expand All @@ -31,7 +31,9 @@ use summit_orchestrator::Message;
use summit_syncer::Update;
use summit_types::account::{ValidatorAccount, ValidatorStatus};
use summit_types::checkpoint::Checkpoint;
use summit_types::consensus_state_query::{ConsensusStateRequest, ConsensusStateResponse};
use summit_types::consensus_state_query::{
ConsensusStateQuery, ConsensusStateRequest, ConsensusStateResponse,
};
use summit_types::execution_request::{
DepositRequest, ExecutionRequest, ParsedExecutionRequest, WithdrawalRequest,
};
Expand All @@ -40,7 +42,7 @@ use summit_types::ext_private_key::derive_observer_keys;
use summit_types::network_oracle::NetworkOracle;
use summit_types::protocol_params::ProtocolParam;
use summit_types::scheme::EpochTransition;
use summit_types::ssz_state_tree::SszProof;
use summit_types::ssz_state_tree::{SszProof, SszStateTree};
use summit_types::ssz_tree_key::SszStateKey;
use summit_types::utils::{
is_first_block_of_epoch, is_last_block_of_epoch, is_penultimate_block_of_epoch,
Expand All @@ -56,6 +58,55 @@ use tracing::{debug, error, info, trace, warn};

const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024);

type FinalizerScheme<V> = bls12381_multisig::Scheme<PublicKey, V>;
type StateQueryResponse<V> = ConsensusStateResponse<FinalizerScheme<V>>;
type StateQueryMessage<V> = (
ConsensusStateRequest,
oneshot::Sender<StateQueryResponse<V>>,
);

/// Generate one proof slot per requested key, preserving positional alignment:
/// the i-th result corresponds to the i-th key, and a key that resolves to no
/// proof (missing collection entry, etc.) yields `None` rather than being
/// dropped. Callers rely on this one-result-per-key invariant — see #260/#267 —
/// so this must stay a `map` (not `filter_map`) into `Vec<Option<SszProof>>`.
fn generate_state_proofs(
proof_tree: &SszStateTree,
validator_keys: &[[u8; 32]],
keys: &[SszStateKey],
) -> Vec<Option<SszProof>> {
keys.iter()
.map(|key| match key {
SszStateKey::Scalar(leaf_index) => Some(proof_tree.generate_scalar_proof(*leaf_index)),
SszStateKey::Validator(pubkey) => {
proof_tree.generate_validator_proof(pubkey, validator_keys)
}
SszStateKey::ValidatorField(pubkey, field_index) => {
proof_tree.generate_validator_field_proof(pubkey, *field_index, validator_keys)
}
SszStateKey::Deposit(index) => proof_tree.generate_deposit_proof(*index),
SszStateKey::DepositField(index, field_index) => {
proof_tree.generate_deposit_field_proof(*index, *field_index)
}
SszStateKey::Withdrawal(pubkey) => proof_tree.generate_withdrawal_proof_by_key(pubkey),
SszStateKey::WithdrawalField(pubkey, field_index) => {
proof_tree.generate_withdrawal_field_proof_by_key(pubkey, *field_index)
}
SszStateKey::ProtocolParam(index) => proof_tree.generate_protocol_param_proof(*index),
SszStateKey::ProtocolParamField(index, field_index) => {
proof_tree.generate_protocol_param_field_proof(*index, *field_index)
}
SszStateKey::AddedValidator(index) => proof_tree.generate_added_validator_proof(*index),
SszStateKey::AddedValidatorField(index, field_index) => {
proof_tree.generate_added_validator_field_proof(*index, *field_index)
}
SszStateKey::RemovedValidator(index) => {
proof_tree.generate_removed_validator_proof(*index)
}
})
.collect()
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum DepositRejectionReason {
Refund,
Expand Down Expand Up @@ -245,6 +296,7 @@ pub struct Finalizer<
V: Variant,
> {
mailbox: mpsc::Receiver<FinalizerMessage<bls12381_multisig::Scheme<PublicKey, V>, Block>>,
state_query: mpsc::Receiver<StateQueryMessage<V>>,
pending_height_notifys: BTreeMap<(u64, Digest), Vec<oneshot::Sender<bool>>>,
context: ContextCell<R>,
engine_client: C,
Expand Down Expand Up @@ -317,8 +369,10 @@ impl<
Self,
ConsensusState,
FinalizerMailbox<bls12381_multisig::Scheme<PublicKey, V>, Block>,
ConsensusStateQuery<bls12381_multisig::Scheme<PublicKey, V>>,
) {
let (tx, rx) = mpsc::channel(cfg.mailbox_size);
let (state_query, state_query_rx) = ConsensusStateQuery::new(cfg.mailbox_size);
let state_cfg = StateConfig {
log: commonware_storage::journal::contiguous::variable::Config {
partition: format!("{}-finalizer_state-log", cfg.db_prefix),
Expand Down Expand Up @@ -384,6 +438,7 @@ impl<
Self {
context: ContextCell::new(context),
mailbox: rx,
state_query: state_query_rx,
engine_client: cfg.engine_client,
oracle: cfg.oracle,
pending_height_notifys: BTreeMap::new(),
Expand Down Expand Up @@ -417,6 +472,7 @@ impl<
},
shared_state,
FinalizerMailbox::new(tx),
state_query,
)
}

Expand All @@ -428,6 +484,8 @@ impl<
let mut last_committed_timestamp: Option<Instant> = None;
let mut signal = self.context.stopped().fuse();
let cancellation_token = self.cancellation_token.clone();
let (_, empty_state_query) = mpsc::channel(1);
let mut state_query = Some(std::mem::replace(&mut self.state_query, empty_state_query));

// Initialize the current epoch with the validator set
// This ensures the orchestrator can start consensus immediately
Expand Down Expand Up @@ -542,7 +600,16 @@ impl<
self.cancellation_token.cancel();
break;
}
select! {
let query_message = async {
match state_query.as_mut() {
Some(state_query) => state_query.next().await,
None => std::future::pending().await,
}
}
.fuse();
futures::pin_mut!(query_message);

select_biased! {
mailbox_message = self.mailbox.next() => {
let mail = mailbox_message.expect("Finalizer mailbox closed");
match mail {
Expand Down Expand Up @@ -702,6 +769,17 @@ impl<
sig = &mut signal => {
info!("runtime terminated, shutting down finalizer: {}", sig.unwrap());
break;
},
query_message = query_message => {
match query_message {
Some((request, response)) => {
self.handle_consensus_state_query(request, response).await;
}
None => {
warn!("finalizer state query mailbox closed");
state_query = None;
}
}
}
}
}
Expand Down Expand Up @@ -1832,58 +1910,38 @@ impl<
el_block_number,
});
}
ConsensusStateRequest::GenerateStateProof(keys) => {
let proof_tree = self.canonical_state.proof_tree();
let proofs: Vec<Option<SszProof>> = keys
.iter()
.map(|key| match key {
SszStateKey::Scalar(leaf_index) => {
Some(proof_tree.generate_scalar_proof(*leaf_index))
}
SszStateKey::Validator(pubkey) => proof_tree.generate_validator_proof(
pubkey,
self.canonical_state.proof_validator_keys(),
),
SszStateKey::ValidatorField(pubkey, field_index) => proof_tree
.generate_validator_field_proof(
pubkey,
*field_index,
self.canonical_state.proof_validator_keys(),
),
SszStateKey::Deposit(index) => proof_tree.generate_deposit_proof(*index),
SszStateKey::DepositField(index, field_index) => {
proof_tree.generate_deposit_field_proof(*index, *field_index)
}
SszStateKey::Withdrawal(pubkey) => {
proof_tree.generate_withdrawal_proof_by_key(pubkey)
}
SszStateKey::WithdrawalField(pubkey, field_index) => {
proof_tree.generate_withdrawal_field_proof_by_key(pubkey, *field_index)
}
SszStateKey::ProtocolParam(index) => {
proof_tree.generate_protocol_param_proof(*index)
}
SszStateKey::ProtocolParamField(index, field_index) => {
proof_tree.generate_protocol_param_field_proof(*index, *field_index)
}
SszStateKey::AddedValidator(index) => {
proof_tree.generate_added_validator_proof(*index)
}
SszStateKey::AddedValidatorField(index, field_index) => {
proof_tree.generate_added_validator_field_proof(*index, *field_index)
}
SszStateKey::RemovedValidator(index) => {
proof_tree.generate_removed_validator_proof(*index)
}
})
.collect();
ConsensusStateRequest::GenerateStateProof(keys, permit) => {
let proof_tree = self.canonical_state.proof_tree_snapshot();
let validator_keys = self.canonical_state.proof_validator_keys_snapshot();
let root = self.canonical_state.get_state_root();
let el_block_number = self.canonical_state.get_proof_el_block_number();
let _ = sender.send(ConsensusStateResponse::StateProof {
root,
el_block_number,
proofs,
});
self.context
.with_label("state_proof")
.shared(true)
.spawn(move |_| async move {
let proofs = generate_state_proofs(
proof_tree.as_ref(),
validator_keys.as_slice(),
&keys,
);
let _ = sender.send(ConsensusStateResponse::StateProof {
root,
el_block_number,
proofs,
});
// Release the rpc concurrency permit (if any) only now
// that the proof work has actually finished. The permit
// must be consumed inside this detached task because
// moving it here is what ties the in-flight-proof count
// to real work. If it instead dropped on the rpc handler
// future, a caller could connect, wait for the spawn,
// disconnect, and repeat to pile up proof tasks under a
// slot count that reads as idle. The explicit drop also
// forces the move closure to capture it rather than
// dropping it on the finalizer loop when the match arm
// ends.
drop(permit);
});
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion finalizer/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl<S: Scheme<B::Digest>, B: ConsensusBlock> FinalizerMailbox<S, B> {
Vec<Option<summit_types::ssz_state_tree::SszProof>>,
) {
let (response, rx) = oneshot::channel();
let request = ConsensusStateRequest::GenerateStateProof(keys);
let request = ConsensusStateRequest::GenerateStateProof(keys, None);
let _ = self
.sender
.clone()
Expand Down
22 changes: 11 additions & 11 deletions finalizer/src/tests/fork_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn test_orphaned_block_processed_when_parent_arrives() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -266,7 +266,7 @@ fn test_fork_aux_data_does_not_finalize_unfinalized_fork_head() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -374,7 +374,7 @@ fn test_losing_height_waiter_resolves_false_on_conflicting_finalization() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -463,7 +463,7 @@ fn test_competing_digest_waiter_stays_pending_until_finalization() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -553,7 +553,7 @@ fn test_finalization_resolves_lower_waiters_and_preserves_future_waiters() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -643,7 +643,7 @@ fn test_multiple_forks_tracked() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -729,7 +729,7 @@ fn test_dead_fork_block_discarded() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -831,7 +831,7 @@ fn test_fork_states_pruned_after_finalization() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -952,7 +952,7 @@ fn test_orphaned_blocks_pruned_after_finalization() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -1065,7 +1065,7 @@ fn test_fork_state_reused_when_notarized_then_finalized() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down Expand Up @@ -1174,7 +1174,7 @@ fn test_competing_fork_pruned_on_finalization() {
_variant_marker: PhantomData,
};

let (finalizer, _state, mut mailbox) =
let (finalizer, _state, mut mailbox, _state_query) =
Finalizer::<_, MockEngineClient, MockNetworkOracle, ed25519::PrivateKey, MinPk>::new(
context.with_label("finalizer"),
finalizer_cfg,
Expand Down
Loading
Loading