refactor: add IntoChunksExt trait for chunking#125
Conversation
f0c874a to
8613a52
Compare
IntoChunksExt and process_batches
351e838 to
964d930
Compare
964d930 to
72c5887
Compare
There was a problem hiding this comment.
Pull request overview
Refactors the timer-driven batching logic by introducing a staged chunking utility and a shared process_batches helper to reduce duplication around blockhash fetching + concurrent batch execution.
Changes:
- Add
crate::utils::chunks(Chunked<I>,IntoChunksExt) and update batching call sites to useinto_chunks(...).take_chunks(...). - Add
batch::process_batchesto encapsulate “fetch slot/blockhash thenjoin_allper batch” (used by consolidate + withdrawal flows). - Update rescheduling logic in withdrawal + consolidation paths to defuse only when the underlying queue is empty.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
minter/src/withdraw/mod.rs |
Switch withdrawal batching to IntoChunksExt and use process_batches; adjust reschedule defuse condition. |
minter/src/consolidate/mod.rs |
Switch consolidation batching to IntoChunksExt and use process_batches; adjust reschedule defuse condition. |
minter/src/monitor/mod.rs |
Switch signature status-check batching to IntoChunksExt. |
minter/src/utils/mod.rs |
Export new utils::chunks module. |
minter/src/utils/chunks.rs |
Introduce staged chunking API + unit tests. |
minter/src/batch.rs |
New helper to fetch slot/blockhash once and run batch futures concurrently. |
minter/src/lib.rs |
Add mod batch; to include the new helper module. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if read_state(|s| s.pending_withdrawal_requests().is_empty()) { | ||
| scopeguard::ScopeGuard::into_inner(reschedule); | ||
| } |
There was a problem hiding this comment.
The reschedule guard is only defused when the entire pending-withdrawal queue becomes empty. If there are pending requests but none are currently affordable (so batches is empty), or if batch creation repeatedly fails, this will reschedule with Duration::ZERO in a tight loop (and also repeatedly schedule consolidation). Consider defusing the reschedule when no batches can be built, and/or rescheduling with a delay/backoff for the “no progress” case.
| Err(e) => log!(Priority::Info, "Deposit consolidation failed: {e}"), | ||
| let deposits = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate())); | ||
| let batches = deposits | ||
| .iter() |
There was a problem hiding this comment.
This chunking approach clones each grouped deposit entry (including the Vec<LedgerMintIndex>), which can be expensive and is avoidable here since deposits is a freshly-created Vec that could be chunked by value. Consider switching to an into_iter()-based chunking path or extending IntoChunksExt to support owned iterators so these batches can be built without cloning large vectors.
| .iter() | |
| .into_iter() |
| pub trait IntoChunksExt<'a, T: 'a>: Sized { | ||
| /// Begins a chunked collection with the given chunk size. | ||
| /// | ||
| /// # Panics | ||
| /// | ||
| /// Panics if `chunk_size` is zero. | ||
| fn into_chunks(self, chunk_size: usize) -> Chunked<Self>; | ||
| } | ||
|
|
||
| impl<'a, T: 'a, I> IntoChunksExt<'a, T> for I | ||
| where | ||
| I: Iterator<Item = &'a T>, | ||
| { | ||
| fn into_chunks(self, chunk_size: usize) -> Chunked<Self> { | ||
| assert!(chunk_size > 0, "chunk_size must be greater than zero"); | ||
| Chunked { | ||
| iter: self, | ||
| chunk_size, | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
IntoChunksExt only supports iterators over references and requires cloning items when collecting chunks. That forces call sites to clone even when they already own a Vec<T> and could chunk by value. Consider adding an owned variant (e.g., implement for Iterator<Item = T> returning Vec<Vec<T>> without cloning) to avoid extra copies/allocations in batch-building hot paths.
| let batches = signatures | ||
| .iter() | ||
| .into_chunks(MAX_SIGNATURES_PER_STATUS_CHECK) | ||
| .take_chunks(MAX_CONCURRENT_RPC_CALLS); |
There was a problem hiding this comment.
This change clones signatures into new Vecs when building batches (signatures.iter() ... take_chunks(...)). Since signatures is already owned here, chunking by value would avoid the extra copies. If IntoChunksExt is kept reference-only, consider using the previous into_iter().chunks(...) pattern for owned collections in hot paths.
| /// Fetches a recent slot and blockhash, then calls `f` concurrently for each batch. | ||
| /// | ||
| /// Returns `false` if `batches` is empty or the blockhash fetch fails, indicating that no work | ||
| /// was done and the timer should not reschedule. | ||
| pub async fn process_batches<T, F, Fut>( | ||
| runtime: &impl CanisterRuntime, | ||
| batches: Vec<Vec<T>>, | ||
| f: F, | ||
| ) -> bool | ||
| where |
There was a problem hiding this comment.
The doc comment says the bool return value indicates whether the timer should reschedule, but process_batches doesn’t control rescheduling and current callers ignore the return value. This is misleading; either remove the timer/reschedule wording (and possibly the bool return altogether) or update call sites to use it consistently.
| pub async fn process_batches<T, F, Fut>( | ||
| runtime: &impl CanisterRuntime, | ||
| batches: Vec<Vec<T>>, | ||
| f: F, | ||
| ) -> bool | ||
| where | ||
| F: Fn(Vec<T>, Slot, Hash) -> Fut, | ||
| Fut: Future<Output = ()>, | ||
| { | ||
| if batches.is_empty() { | ||
| return false; | ||
| } | ||
| let (slot, blockhash) = match get_recent_slot_and_blockhash(runtime).await { | ||
| Ok(result) => result, | ||
| Err(e) => { | ||
| log!(Priority::Info, "Failed to fetch recent blockhash: {e}"); | ||
| return false; | ||
| } | ||
| }; | ||
| futures::future::join_all(batches.into_iter().map(|batch| f(batch, slot, blockhash))).await; | ||
| true | ||
| } |
There was a problem hiding this comment.
process_batches is new shared behavior (empty-batch handling, blockhash-fetch failure path, and “call f for each batch” semantics) but has no unit tests. The repo already has a TestCanisterRuntime and tests for get_recent_slot_and_blockhash (e.g., minter/src/transaction/tests.rs), so it should be feasible to add focused tests verifying the return value and that f is/ isn’t invoked in the empty/error/success cases.
| if read_state(|s| s.deposits_to_consolidate().is_empty()) { | ||
| scopeguard::ScopeGuard::into_inner(reschedule); | ||
| } |
There was a problem hiding this comment.
Rescheduling is now defused only when deposits_to_consolidate() becomes empty. If submit_consolidation_transaction fails before it records a SubmittedTransaction event (e.g., signing/transaction निर्माण fails), the queue won’t shrink and this will reschedule with Duration::ZERO repeatedly, potentially hot-looping on a persistent error. Consider adding delay/backoff (or only immediate-reschedule when progress was made / queue length decreased).
| log!(Priority::Info, "Failed to fetch recent blockhash: {e}"); | ||
| return; | ||
| let batches = affordable_requests | ||
| .iter() |
There was a problem hiding this comment.
affordable_requests is already built by cloning pending requests from state, and this chunking path clones again (.iter().into_chunks(...).take_chunks(...) clones each element). This introduces unnecessary extra allocations/copies per round; consider chunking by value (e.g., using into_iter()-based chunking) or extending IntoChunksExt to support owned items so batches can be formed without cloning.
| .iter() | |
| .into_iter() |
72c5887 to
a5f8482
Compare
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
a5f8482 to
91e009f
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let batches = affordable | ||
| .iter() | ||
| .into_chunks(MAX_WITHDRAWALS_PER_TX) | ||
| .take_chunks(MAX_CONCURRENT_RPC_CALLS); | ||
| (batches, more_to_process, needs_consolidation) |
There was a problem hiding this comment.
affordable is already built by cloning requests from state, and then affordable.iter().into_chunks(...).take_chunks(...) clones each WithdrawalRequest again when building batches. This adds avoidable allocations/copies for every processed request. Consider chunking the owned affordable vector by value (moving items into batches) or extending the chunking utility to support Iterator<Item = T> so these batches can be built without a second clone.
| impl<'a, T, I> Chunked<I> | ||
| where | ||
| I: Iterator<Item = &'a T>, | ||
| T: Clone + 'a, | ||
| { | ||
| /// Collects at most `max_chunks` chunks, discarding any remaining items. | ||
| pub fn take_chunks(self, max_chunks: usize) -> Vec<Vec<T>> { | ||
| let chunked = self.iter.chunks(self.chunk_size); | ||
| chunked | ||
| .into_iter() | ||
| .take(max_chunks) | ||
| .map(|chunk| chunk.cloned().collect()) | ||
| .collect() | ||
| } |
There was a problem hiding this comment.
The current Chunked/IntoChunksExt design only supports Iterator<Item = &T> and builds chunks by cloning (chunk.cloned().collect()). This forces callers with owned data (e.g., Vec<(Account, (Lamport, Vec<...>))>) to iterate by reference and clone just to batch. Consider adding an owning variant/impl that chunks Iterator<Item = T> by moving items, so batch construction can remain allocation-efficient.
IntoChunksExt and process_batchesIntoChunksExt trait for chunking
Summary
crate::utils::chunkswithChunked<I>andIntoChunksExt, a staged chunking API replacinginto_batchesqueue.iter().into_chunks(CHUNK_SIZE).take_chunks(MAX_CONCURRENT_RPC_CALLS)process_batchesto thebatchmodule, encapsulating fetch-blockhash +join_allfor the consolidation and withdrawal timersprocess_batches's return value), preserving the fix from refactor: single-round timers with rescheduling #97Stacks on #97.
🤖 Generated with Claude Code