Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
fd0f896
Add a default FileStatisticsCache implementation for the ListingTable
mkleen Jan 18, 2026
2ff77a9
fixup! Add a default FileStatisticsCache implementation for the Listi…
mkleen Jan 28, 2026
8b2d6ef
Adapt memory usage when removing entries
mkleen Feb 4, 2026
95a08d8
Adapt heapsize for &str
mkleen Feb 4, 2026
6da78ce
Fix formatting
mkleen Feb 4, 2026
bea408d
Adapt heapsize for &str and add another scalarvalue
mkleen Feb 4, 2026
8f7f9d2
Add better error message
mkleen Feb 10, 2026
cd56471
Add todo to add heapsize for ordering in CachedFileMetadata
mkleen Feb 10, 2026
e2c2463
Fix comment/docs on DefaultFileStatisticsCache
mkleen Feb 10, 2026
07b0c00
Simplify test data generation
mkleen Feb 10, 2026
37385bb
Remove potential stale entry, if entry is too large
mkleen Feb 10, 2026
960357d
Fix typo in sql logic test comment
mkleen Feb 10, 2026
03ac26d
Fix comment about default behaviour in cache manager
mkleen Feb 10, 2026
213e7b3
Fix variable name in test
mkleen Feb 10, 2026
83c1ce5
Fix variable name in test
mkleen Feb 10, 2026
d41416b
Disable cache for sql logic test
mkleen Feb 10, 2026
0426037
Include key into memory estimation
mkleen Feb 11, 2026
264b251
Fix fmt
mkleen Feb 11, 2026
ddf5b6b
Fix clippy
mkleen Feb 11, 2026
dacbcf2
minor
mkleen Feb 11, 2026
d7066b0
Add more key memory accounting
mkleen Feb 12, 2026
d6aa6c4
Fix Formatting
mkleen Feb 12, 2026
4d93c31
Account path as string and remove dependency to object_store
mkleen Feb 12, 2026
98ea7fa
Improve error handling
mkleen Feb 12, 2026
20cd255
Fix fmt
mkleen Feb 12, 2026
c4c782f
Remove path.clone
mkleen Feb 12, 2026
0dead2e
Simplify accounting for statistics
mkleen Feb 12, 2026
63201a0
Adapt offset buffer
mkleen Feb 12, 2026
71ff66e
Fix heap size for Arc
mkleen Feb 12, 2026
4a2de63
Adapt estimate in test
mkleen Feb 12, 2026
71405fc
Fix sql logic test
mkleen Feb 12, 2026
326b46e
Register cache from cachemanager at listing table
mkleen Apr 8, 2026
12b2d81
Revert slt
mkleen Apr 8, 2026
fbb0dbb
Add tablescoping for file stats cache
mkleen Feb 18, 2026
7ff7021
Adapt slt
mkleen Apr 9, 2026
f1db937
Fix linter
mkleen Apr 9, 2026
1069914
Remove uneeded clone
mkleen Apr 9, 2026
36ebf45
Rename cache_unit to file_statistics_cache
mkleen Apr 9, 2026
ecedf49
Simplify heap size accounting
mkleen Apr 9, 2026
ef4bb12
Adapt comments in test
mkleen Apr 10, 2026
bfa78c3
Seperate drop table clean-ups
mkleen Apr 10, 2026
242f151
fixup! Seperate drop table clean-ups
mkleen Apr 10, 2026
210ba77
Increase default limit to 10 mb
mkleen Apr 15, 2026
f2ea873
Increase default limit to 20 mb
mkleen Apr 15, 2026
6eae740
Fix comment
mkleen Apr 15, 2026
25e3a03
Fix deregister logic
mkleen Apr 15, 2026
37cdf7a
Fix slt
mkleen Apr 15, 2026
daa3cb7
Add table reference to FileStatisticsCacheEntry
mkleen Apr 15, 2026
be0c170
fixup! Add table reference to FileStatisticsCacheEntry
mkleen Apr 15, 2026
105ec9a
Fix comment
mkleen Apr 15, 2026
ff29eb4
Fix runtime_env entry
mkleen Apr 19, 2026
9ed435f
Add cache for all benchmark runs
mkleen Apr 21, 2026
1f9fe95
Add cache to listing table creation
mkleen Apr 21, 2026
7babcd4
fixup! Add cache to listing table creation
mkleen Apr 21, 2026
87b0773
Adapt limit to 20M in configs.md
mkleen Apr 22, 2026
0759d2e
fixup! Adapt limit to 20M in configs.md
mkleen Apr 22, 2026
e35230a
Fix linter
mkleen Apr 22, 2026
4f71c32
Add cache to listing table in _read_type()
mkleen Apr 22, 2026
e7f4607
Add ListView and LargeListView to heapsize
mkleen Apr 22, 2026
bfa54a9
fixup! Add ListView and LargeListView to heapsize
mkleen Apr 22, 2026
8ea3af1
Remove array.slt
mkleen Apr 22, 2026
be1c766
Add table ref to ListingTableUrl
mkleen Apr 23, 2026
3b30524
Add heapsize for table-scoped-path
mkleen Apr 23, 2026
ffcd39d
Make list_entries table-scoped
mkleen Apr 23, 2026
78abfb1
fixup! Make list_entries table-scoped
mkleen Apr 23, 2026
87d7418
fixup! fixup! Make list_entries table-scoped
mkleen Apr 23, 2026
174328e
Improve heap size estimation for Arc
mkleen Apr 26, 2026
2b84a82
fixup! Improve heap size estimation for Arc
mkleen Apr 27, 2026
eda9d32
Update migration guide
mkleen Apr 27, 2026
9c185e3
fixup! Update migration guide
mkleen Apr 27, 2026
98c3ca9
Improve heapsize estimation for TableReference
mkleen Apr 29, 2026
640a0e7
Improve memory handling when inserting
mkleen Apr 29, 2026
280897a
Fix comments in Cache Manager
mkleen Apr 29, 2026
e0a8ee9
Improve upgrade guide
mkleen Apr 29, 2026
990e506
Fix upgrade guide
mkleen Apr 29, 2026
f679906
Return stale entries from cache
mkleen May 4, 2026
89bbc7d
Fix upgrade guide
mkleen May 4, 2026
358dd43
Fix Arc<str> heapsize test
mkleen May 5, 2026
680da7f
Remove const i32 cast from heapsize estimation
mkleen May 5, 2026
89b4e34
Fix heapsize estimation for Arc<T>
mkleen May 5, 2026
027dbaa
Fix comment in cache_manager
mkleen May 5, 2026
ba71bde
Fix linter + clippy
mkleen May 5, 2026
535352b
Adapt test acording to heapsize estimation changes
mkleen May 5, 2026
14d97bd
Always add tableref to partioned files
mkleen May 8, 2026
3034387
fixup! Always add tableref to partioned files
mkleen May 8, 2026
90264e6
Add table to statistics_cache output
mkleen May 8, 2026
06ad6d9
Adopt test to new output
mkleen May 8, 2026
df2ec4d
Adopt configuration with '0' value
mkleen May 8, 2026
8cc9b3d
Update configs.md
mkleen May 8, 2026
ce0baeb
Add reset after show in slt
mkleen May 9, 2026
63fafe4
Extract cache invalidation logic
mkleen May 10, 2026
b3649dd
Merge branch 'main' into file-stats-cache
mkleen May 11, 2026
f340fe8
Merge branch 'main' into file-stats-cache
mkleen May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ impl ExternalAggrConfig {
let config = ListingTableConfig::new(table_path).with_listing_options(options);
let config = config.infer_schema(&state).await?;

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,9 @@ impl RunOpt {
_ => unreachable!(),
};

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/tpcds/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
7 changes: 6 additions & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ impl TableFunctionImpl for StatisticsCacheFunc {

let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new("table", DataType::Utf8, false),
Field::new(
"file_modified",
DataType::Timestamp(TimeUnit::Millisecond, None),
Expand All @@ -649,6 +650,7 @@ impl TableFunctionImpl for StatisticsCacheFunc {

// construct record batch from metadata
let mut path_arr = vec![];
let mut table_arr = vec![];
let mut file_modified_arr = vec![];
let mut file_size_bytes_arr = vec![];
let mut e_tag_arr = vec![];
Expand All @@ -661,7 +663,9 @@ impl TableFunctionImpl for StatisticsCacheFunc {
if let Some(file_statistics_cache) = self.cache_manager.get_file_statistic_cache()
{
for (path, entry) in file_statistics_cache.list_entries() {
path_arr.push(path.to_string());
path_arr.push(path.path.to_string());
table_arr
.push(path.table.map_or_else(|| "".to_string(), |t| t.to_string()));
file_modified_arr
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.object_meta.size);
Expand All @@ -678,6 +682,7 @@ impl TableFunctionImpl for StatisticsCacheFunc {
schema.clone(),
vec![
Arc::new(StringArray::from(path_arr)),
Arc::new(StringArray::from(table_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Arc::new(UInt64Array::from(file_size_bytes_arr)),
Arc::new(StringArray::from(e_tag_arr)),
Expand Down
74 changes: 9 additions & 65 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,7 @@ mod tests {
use super::*;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
DefaultListFilesCache, cache_manager::CacheManagerConfig,
cache_unit::DefaultFileStatisticsCache,
},
execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig},
prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
Expand Down Expand Up @@ -656,8 +653,6 @@ mod tests {
Ok(())
}

/// Shows that the statistics cache is not enabled by default yet
/// See https://github.com/apache/datafusion/issues/19217
#[tokio::test]
async fn test_statistics_cache_default() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -687,68 +682,17 @@ mod tests {
.await?;
}

// When the cache manager creates a StatisticsCache by default,
// the contents will show up here
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let sql = "SELECT split_part(path, '/', -1) as filename, table, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
++
++
");

Ok(())
}

// Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved
#[tokio::test]
async fn test_statistics_cache_override() -> Result<(), DataFusionError> {
// Install a specific StatisticsCache implementation
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()?;
let config = SessionConfig::new().with_collect_statistics(true);
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

for filename in [
"alltypes_plain",
"alltypes_tiny_pages",
"lz4_raw_compressed_larger",
] {
ctx.sql(
format!(
"create external table {filename}
stored as parquet
location '../parquet-testing/data/{filename}.parquet'",
)
.as_str(),
)
.await?
.collect()
.await?;
}

let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+-----------------------------------+-----------------+--------------+-------------+------------------+
| filename | file_size_bytes | num_rows | num_columns | table_size_bytes |
+-----------------------------------+-----------------+--------------+-------------+------------------+
| alltypes_plain.parquet | 1851 | Exact(8) | 11 | Absent |
| alltypes_tiny_pages.parquet | 454233 | Exact(7300) | 13 | Absent |
| lz4_raw_compressed_larger.parquet | 380836 | Exact(10000) | 1 | Absent |
+-----------------------------------+-----------------+--------------+-------------+------------------+
+-----------------------------------+---------------------------+-----------------+--------------+-------------+------------------+
| filename | table | file_size_bytes | num_rows | num_columns | table_size_bytes |
+-----------------------------------+---------------------------+-----------------+--------------+-------------+------------------+
| alltypes_plain.parquet | alltypes_plain | 1851 | Exact(8) | 11 | Absent |
Comment thread
mkleen marked this conversation as resolved.
| alltypes_tiny_pages.parquet | alltypes_tiny_pages | 454233 | Exact(7300) | 13 | Absent |
| lz4_raw_compressed_larger.parquet | lz4_raw_compressed_larger | 380836 | Exact(10000) | 1 | Absent |
+-----------------------------------+---------------------------+-----------------+--------------+-------------+------------------+
");

Ok(())
Expand Down
34 changes: 30 additions & 4 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use std::mem;
use std::sync::Arc;

use datafusion_catalog::Session;
use datafusion_common::{HashMap, Result, ScalarValue, assert_or_internal_err};
use datafusion_datasource::ListingTableUrl;
use datafusion_common::{
HashMap, Result, ScalarValue, TableReference, assert_or_internal_err,
};
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::{FileExtensions, ListingTableUrl};
use datafusion_expr::{BinaryExpr, Operator, lit, utils};

use arrow::{
Expand Down Expand Up @@ -382,6 +384,7 @@ fn try_into_partitioned_file(

let mut pf: PartitionedFile = object_meta.into();
pf.partition_values = partition_values;
pf.table_reference.clone_from(table_path.get_table_ref());

Ok(Some(pf))
}
Expand Down Expand Up @@ -416,8 +419,15 @@ pub async fn pruned_partition_list<'a>(
table_path
);

// if no partition col => simply list all the files
Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
// if no partition col => list all the files
Ok(objects
.try_filter_map(|object_meta| {
futures::future::ready(object_meta_to_partitioned_file(
object_meta,
table_path.get_table_ref(),
Copy link
Copy Markdown
Contributor Author

@mkleen mkleen May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table-reference needs to be always passed on the PartitionedFile because file statistics cache is now table-scoped and need the Table-reference for the caching for the key.

))
})
.boxed())
} else {
let df_schema = DFSchema::from_unqualified_fields(
partition_cols
Expand All @@ -442,6 +452,22 @@ pub async fn pruned_partition_list<'a>(
}
}

fn object_meta_to_partitioned_file(
object_meta: ObjectMeta,
table_ref: &Option<TableReference>,
) -> Result<Option<PartitionedFile>> {
Ok(Some(PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
ordering: None,
extensions: FileExtensions::new(),
metadata_size_hint: None,
table_reference: table_ref.clone(),
}))
}

/// Extract the partition values for the given `file_path` (in the given `table_path`)
/// associated to the partitions defined by `table_partition_cols`
pub fn parse_partitions_for_path<'a, I>(
Expand Down
35 changes: 19 additions & 16 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use datafusion_datasource::{
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -187,7 +186,7 @@ pub struct ListingTable {
/// The SQL definition for this table, if any
definition: Option<String>,
/// Cache for collected file statistics
collected_statistics: Arc<dyn FileStatisticsCache>,
collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
/// Constraints applied to this table
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
Expand Down Expand Up @@ -231,7 +230,7 @@ impl ListingTable {
schema_source,
options,
definition: None,
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
collected_statistics: None,
constraints: Constraints::default(),
column_defaults: HashMap::new(),
expr_adapter_factory: config.expr_adapter_factory,
Expand Down Expand Up @@ -260,10 +259,8 @@ impl ListingTable {
/// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
/// multiple times in the same session.
///
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self.collected_statistics = cache;
self
}

Expand Down Expand Up @@ -802,11 +799,15 @@ impl ListingTable {
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
use datafusion_execution::cache::cache_manager::CachedFileMetadata;

let path = &part_file.object_meta.location;
let path = TableScopedPath {
table: part_file.table_reference.clone(),
path: part_file.object_meta.location.clone(),
};
let meta = &part_file.object_meta;

// Check cache first - if we have valid cached statistics and ordering
if let Some(cached) = self.collected_statistics.get(path)
if let Some(cache) = &self.collected_statistics
&& let Some(cached) = cache.get(&path)
&& cached.is_valid_for(meta)
{
// Return cached statistics and ordering
Expand All @@ -823,14 +824,16 @@ impl ListingTable {
let statistics = Arc::new(file_meta.statistics);

// Store in cache
self.collected_statistics.put(
path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
if let Some(cache) = &self.collected_statistics {
cache.put(
&path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
}

Ok((statistics, file_meta.ordering))
}
Expand Down
Loading
Loading