Skip to content

refactor: add IntoChunksExt trait for chunking#125

Draft
lpahlavi wants to merge 1 commit intomainfrom
lpahlavi/chunking-helpers
Draft

refactor: add IntoChunksExt trait for chunking#125
lpahlavi wants to merge 1 commit intomainfrom
lpahlavi/chunking-helpers

Conversation

@lpahlavi
Copy link
Copy Markdown
Contributor

@lpahlavi lpahlavi commented Apr 13, 2026

Summary

  • Adds crate::utils::chunks with Chunked<I> and IntoChunksExt, a staged chunking API replacing into_batches
  • All four timers now use queue.iter().into_chunks(CHUNK_SIZE).take_chunks(MAX_CONCURRENT_RPC_CALLS)
  • Adds process_batches to the batch module, encapsulating fetch-blockhash + join_all for the consolidation and withdrawal timers
  • Reschedule decisions are based purely on queue emptiness (not process_batches's return value), preserving the fix from refactor: single-round timers with rescheduling #97

Stacks on #97.

🤖 Generated with Claude Code

@lpahlavi lpahlavi force-pushed the lpahlavi/single-round-timers branch 8 times, most recently from f0c874a to 8613a52 Compare April 14, 2026 12:30
@lpahlavi lpahlavi changed the title refactor: IntoChunksExt and process_batches refactor: IntoChunksExt and process_batches Apr 14, 2026
@lpahlavi lpahlavi force-pushed the lpahlavi/chunking-helpers branch from 351e838 to 964d930 Compare April 14, 2026 15:20
Base automatically changed from lpahlavi/single-round-timers to main April 14, 2026 17:00
Copilot AI review requested due to automatic review settings April 15, 2026 08:47
@lpahlavi lpahlavi force-pushed the lpahlavi/chunking-helpers branch from 964d930 to 72c5887 Compare April 15, 2026 08:47
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the timer-driven batching logic by introducing a staged chunking utility and a shared process_batches helper to reduce duplication around blockhash fetching + concurrent batch execution.

Changes:

  • Add crate::utils::chunks (Chunked<I>, IntoChunksExt) and update batching call sites to use into_chunks(...).take_chunks(...).
  • Add batch::process_batches to encapsulate “fetch slot/blockhash then join_all per batch” (used by consolidate + withdrawal flows).
  • Update rescheduling logic in withdrawal + consolidation paths to defuse only when the underlying queue is empty.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
minter/src/withdraw/mod.rs Switch withdrawal batching to IntoChunksExt and use process_batches; adjust reschedule defuse condition.
minter/src/consolidate/mod.rs Switch consolidation batching to IntoChunksExt and use process_batches; adjust reschedule defuse condition.
minter/src/monitor/mod.rs Switch signature status-check batching to IntoChunksExt.
minter/src/utils/mod.rs Export new utils::chunks module.
minter/src/utils/chunks.rs Introduce staged chunking API + unit tests.
minter/src/batch.rs New helper to fetch slot/blockhash once and run batch futures concurrently.
minter/src/lib.rs Add mod batch; to include the new helper module.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread minter/src/withdraw/mod.rs Outdated
Comment on lines 154 to 156
if read_state(|s| s.pending_withdrawal_requests().is_empty()) {
scopeguard::ScopeGuard::into_inner(reschedule);
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reschedule guard is only defused when the entire pending-withdrawal queue becomes empty. If there are pending requests but none are currently affordable (so batches is empty), or if batch creation repeatedly fails, this will reschedule with Duration::ZERO in a tight loop (and also repeatedly schedule consolidation). Consider defusing the reschedule when no batches can be built, and/or rescheduling with a delay/backoff for the “no progress” case.

Copilot uses AI. Check for mistakes.
Comment thread minter/src/consolidate/mod.rs Outdated
Err(e) => log!(Priority::Info, "Deposit consolidation failed: {e}"),
let deposits = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate()));
let batches = deposits
.iter()
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This chunking approach clones each grouped deposit entry (including the Vec<LedgerMintIndex>), which can be expensive and is avoidable here since deposits is a freshly-created Vec that could be chunked by value. Consider switching to an into_iter()-based chunking path or extending IntoChunksExt to support owned iterators so these batches can be built without cloning large vectors.

Suggested change
.iter()
.into_iter()

Copilot uses AI. Check for mistakes.
Comment on lines +39 to +59
pub trait IntoChunksExt<'a, T: 'a>: Sized {
/// Begins a chunked collection with the given chunk size.
///
/// # Panics
///
/// Panics if `chunk_size` is zero.
fn into_chunks(self, chunk_size: usize) -> Chunked<Self>;
}

impl<'a, T: 'a, I> IntoChunksExt<'a, T> for I
where
I: Iterator<Item = &'a T>,
{
fn into_chunks(self, chunk_size: usize) -> Chunked<Self> {
assert!(chunk_size > 0, "chunk_size must be greater than zero");
Chunked {
iter: self,
chunk_size,
}
}
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntoChunksExt only supports iterators over references and requires cloning items when collecting chunks. That forces call sites to clone even when they already own a Vec<T> and could chunk by value. Consider adding an owned variant (e.g., implement for Iterator<Item = T> returning Vec<Vec<T>> without cloning) to avoid extra copies/allocations in batch-building hot paths.

Copilot uses AI. Check for mistakes.
Comment thread minter/src/monitor/mod.rs Outdated
Comment on lines +177 to +180
let batches = signatures
.iter()
.into_chunks(MAX_SIGNATURES_PER_STATUS_CHECK)
.take_chunks(MAX_CONCURRENT_RPC_CALLS);
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change clones signatures into new Vecs when building batches (signatures.iter() ... take_chunks(...)). Since signatures is already owned here, chunking by value would avoid the extra copies. If IntoChunksExt is kept reference-only, consider using the previous into_iter().chunks(...) pattern for owned collections in hot paths.

Copilot uses AI. Check for mistakes.
Comment thread minter/src/batch.rs Outdated
Comment on lines +8 to +17
/// Fetches a recent slot and blockhash, then calls `f` concurrently for each batch.
///
/// Returns `false` if `batches` is empty or the blockhash fetch fails, indicating that no work
/// was done and the timer should not reschedule.
pub async fn process_batches<T, F, Fut>(
runtime: &impl CanisterRuntime,
batches: Vec<Vec<T>>,
f: F,
) -> bool
where
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment says the bool return value indicates whether the timer should reschedule, but process_batches doesn’t control rescheduling and current callers ignore the return value. This is misleading; either remove the timer/reschedule wording (and possibly the bool return altogether) or update call sites to use it consistently.

Copilot uses AI. Check for mistakes.
Comment thread minter/src/batch.rs Outdated
Comment on lines +12 to +33
pub async fn process_batches<T, F, Fut>(
runtime: &impl CanisterRuntime,
batches: Vec<Vec<T>>,
f: F,
) -> bool
where
F: Fn(Vec<T>, Slot, Hash) -> Fut,
Fut: Future<Output = ()>,
{
if batches.is_empty() {
return false;
}
let (slot, blockhash) = match get_recent_slot_and_blockhash(runtime).await {
Ok(result) => result,
Err(e) => {
log!(Priority::Info, "Failed to fetch recent blockhash: {e}");
return false;
}
};
futures::future::join_all(batches.into_iter().map(|batch| f(batch, slot, blockhash))).await;
true
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_batches is new shared behavior (empty-batch handling, blockhash-fetch failure path, and “call f for each batch” semantics) but has no unit tests. The repo already has a TestCanisterRuntime and tests for get_recent_slot_and_blockhash (e.g., minter/src/transaction/tests.rs), so it should be feasible to add focused tests verifying the return value and that f is/ isn’t invoked in the empty/error/success cases.

Copilot uses AI. Check for mistakes.
Comment thread minter/src/consolidate/mod.rs Outdated
Comment on lines 64 to 66
if read_state(|s| s.deposits_to_consolidate().is_empty()) {
scopeguard::ScopeGuard::into_inner(reschedule);
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescheduling is now defused only when deposits_to_consolidate() becomes empty. If submit_consolidation_transaction fails before it records a SubmittedTransaction event (e.g., signing/transaction निर्माण fails), the queue won’t shrink and this will reschedule with Duration::ZERO repeatedly, potentially hot-looping on a persistent error. Consider adding delay/backoff (or only immediate-reschedule when progress was made / queue length decreased).

Copilot uses AI. Check for mistakes.
Comment thread minter/src/withdraw/mod.rs Outdated
log!(Priority::Info, "Failed to fetch recent blockhash: {e}");
return;
let batches = affordable_requests
.iter()
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

affordable_requests is already built by cloning pending requests from state, and this chunking path clones again (.iter().into_chunks(...).take_chunks(...) clones each element). This introduces unnecessary extra allocations/copies per round; consider chunking by value (e.g., using into_iter()-based chunking) or extending IntoChunksExt to support owned items so batches can be formed without cloning.

Suggested change
.iter()
.into_iter()

Copilot uses AI. Check for mistakes.
@lpahlavi lpahlavi force-pushed the lpahlavi/chunking-helpers branch from 72c5887 to a5f8482 Compare April 15, 2026 09:26
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 15, 2026 09:31
@lpahlavi lpahlavi force-pushed the lpahlavi/chunking-helpers branch from a5f8482 to 91e009f Compare April 15, 2026 09:31
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +125 to +129
let batches = affordable
.iter()
.into_chunks(MAX_WITHDRAWALS_PER_TX)
.take_chunks(MAX_CONCURRENT_RPC_CALLS);
(batches, more_to_process, needs_consolidation)
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

affordable is already built by cloning requests from state, and then affordable.iter().into_chunks(...).take_chunks(...) clones each WithdrawalRequest again when building batches. This adds avoidable allocations/copies for every processed request. Consider chunking the owned affordable vector by value (moving items into batches) or extending the chunking utility to support Iterator<Item = T> so these batches can be built without a second clone.

Copilot uses AI. Check for mistakes.
Comment on lines +17 to +30
impl<'a, T, I> Chunked<I>
where
I: Iterator<Item = &'a T>,
T: Clone + 'a,
{
/// Collects at most `max_chunks` chunks, discarding any remaining items.
pub fn take_chunks(self, max_chunks: usize) -> Vec<Vec<T>> {
let chunked = self.iter.chunks(self.chunk_size);
chunked
.into_iter()
.take(max_chunks)
.map(|chunk| chunk.cloned().collect())
.collect()
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current Chunked/IntoChunksExt design only supports Iterator<Item = &T> and builds chunks by cloning (chunk.cloned().collect()). This forces callers with owned data (e.g., Vec<(Account, (Lamport, Vec<...>))>) to iterate by reference and clone just to batch. Consider adding an owning variant/impl that chunks Iterator<Item = T> by moving items, so batch construction can remain allocation-efficient.

Copilot uses AI. Check for mistakes.
@lpahlavi lpahlavi changed the title refactor: IntoChunksExt and process_batches refactor: add IntoChunksExt trait for chunking Apr 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants