Is your feature request related to a problem or challenge?
When a child stream of an operator is done, we should drop it immediately, instead of continuing work and waiting for the owning stream to be dropped.
This cleans up a huge amount of resources, reduces memory pressure, drops MemoryConsumers, thereby letting the pools make more educated decisions, and probably has more benefits down the line.
Describe the solution you'd like
Simply whenever a stream polls its child, and that child returns Ready(None), drop that child stream.
Imagine something like the sort:
let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
context.session_config().spill_compression(),
&self.metrics_set,
context.runtime_env(),
)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort().await
})
.try_flatten(),
)))
This is a memory intensive stream in general, but the last .sort().await can start an expensive merge whose performance is severely limited by the available memory.
Using a naive fair pool, an aggregate leading into that sort will get half the memory reserved for it, unavailable to the sort, oblivious to the fact that the aggregate already finished its work.
imagine if we added
drop(input); right after the while loop.
the sorter.sort().await will now be given a 2x allocation from the pool!
The same logic can be applied pretty much everywhere.
Describe alternatives you've considered
No response
Additional context
No response
Is your feature request related to a problem or challenge?
When a child stream of an operator is done, we should drop it immediately, instead of continuing work and waiting for the owning stream to be dropped.
This cleans up a huge amount of resources, reduces memory pressure, drops MemoryConsumers, thereby letting the pools make more educated decisions, and probably has more benefits down the line.
Describe the solution you'd like
Simply whenever a stream polls its child, and that child returns Ready(None), drop that child stream.
Imagine something like the sort:
This is a memory intensive stream in general, but the last .sort().await can start an expensive merge whose performance is severely limited by the available memory.
Using a naive fair pool, an aggregate leading into that sort will get half the memory reserved for it, unavailable to the sort, oblivious to the fact that the aggregate already finished its work.
imagine if we added
drop(input);right after the while loop.the sorter.sort().await will now be given a 2x allocation from the pool!
The same logic can be applied pretty much everywhere.
Describe alternatives you've considered
No response
Additional context
No response