Skip to content

Cleanup resources of finished streams immediately. #22063

@EmilyMatt

Description

@EmilyMatt

Is your feature request related to a problem or challenge?

When a child stream of an operator is done, we should drop it immediately, instead of continuing work and waiting for the owning stream to be dropped.
This cleans up a huge amount of resources, reduces memory pressure, drops MemoryConsumers, thereby letting the pools make more educated decisions, and probably has more benefits down the line.

Describe the solution you'd like

Simply whenever a stream polls its child, and that child returns Ready(None), drop that child stream.
Imagine something like the sort:

let mut sorter = ExternalSorter::new(
                    partition,
                    input.schema(),
                    self.expr.clone(),
                    context.session_config().batch_size(),
                    execution_options.sort_spill_reservation_bytes,
                    execution_options.sort_in_place_threshold_bytes,
                    context.session_config().spill_compression(),
                    &self.metrics_set,
                    context.runtime_env(),
                )?;
                Ok(Box::pin(RecordBatchStreamAdapter::new(
                    self.schema(),
                    futures::stream::once(async move {
                        while let Some(batch) = input.next().await {
                            let batch = batch?;
                            sorter.insert_batch(batch).await?;
                        }
                        sorter.sort().await
                    })
                    .try_flatten(),
                )))

This is a memory intensive stream in general, but the last .sort().await can start an expensive merge whose performance is severely limited by the available memory.
Using a naive fair pool, an aggregate leading into that sort will get half the memory reserved for it, unavailable to the sort, oblivious to the fact that the aggregate already finished its work.
imagine if we added
drop(input); right after the while loop.
the sorter.sort().await will now be given a 2x allocation from the pool!
The same logic can be applied pretty much everywhere.

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions