diff --git a/Cargo.lock b/Cargo.lock index 79538f0b7f3..5ff72208310 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1924,6 +1924,7 @@ version = "0.1.0" dependencies = [ "anyhow", "arrow-ipc", + "async-trait", "clap", "custom-labels", "datafusion 52.1.0", @@ -1939,6 +1940,7 @@ dependencies = [ "parking_lot", "tokio", "url", + "vortex", "vortex-bench", "vortex-cuda", "vortex-datafusion", @@ -10226,7 +10228,6 @@ dependencies = [ "futures", "insta", "itertools 0.14.0", - "moka", "object_store", "rstest", "tempfile", @@ -10280,6 +10281,7 @@ dependencies = [ "cc", "custom-labels", "futures", + "glob", "itertools 0.14.0", "jiff", "num-traits", @@ -10369,12 +10371,14 @@ dependencies = [ "glob", "itertools 0.14.0", "kanal", + "moka", "object_store", "oneshot", "parking_lot", "pin-project-lite", "tokio", "tracing", + "url", "uuid", "vortex-alp", "vortex-array", @@ -10389,6 +10393,7 @@ dependencies = [ "vortex-fsst", "vortex-io", "vortex-layout", + "vortex-mask", "vortex-metrics", "vortex-pco", "vortex-runend", @@ -10671,6 +10676,7 @@ dependencies = [ "bit-vec 0.8.0", "futures", "itertools 0.14.0", + "kanal", "parking_lot", "roaring 0.11.3", "sketches-ddsketch", diff --git a/bench-orchestrator/bench_orchestrator/comparison/analyzer.py b/bench-orchestrator/bench_orchestrator/comparison/analyzer.py index 5fa05c40d4a..7ac62f188ff 100644 --- a/bench-orchestrator/bench_orchestrator/comparison/analyzer.py +++ b/bench-orchestrator/bench_orchestrator/comparison/analyzer.py @@ -271,8 +271,11 @@ def compare_runs( # Pivot to get (query, engine, format) as rows, runs as columns pivot = combined.pivot_table(index=["query", "engine", "format"], columns="run", values="value", aggfunc="mean") + # Deduplicate labels while preserving order (two runs can share a label). + unique_labels = list(dict.fromkeys(labels)) + # Reorder columns to match input order - pivot = pivot[[label for label in labels if label in pivot.columns]] + pivot = pivot[[label for label in unique_labels if label in pivot.columns]] # Compute ratios relative to baseline if baseline_label in pivot.columns: @@ -283,4 +286,4 @@ def compare_runs( else: result = pivot - return PivotComparison(df=result.reset_index(), baseline=baseline_label, columns=labels) + return PivotComparison(df=result.reset_index(), baseline=baseline_label, columns=unique_labels) diff --git a/bench-orchestrator/bench_orchestrator/storage/store.py b/bench-orchestrator/bench_orchestrator/storage/store.py index 96847ac5df2..1703af1ad6f 100644 --- a/bench-orchestrator/bench_orchestrator/storage/store.py +++ b/bench-orchestrator/bench_orchestrator/storage/store.py @@ -96,9 +96,13 @@ def write_result(self, result: QueryResult) -> None: self._result_count += 1 def write_raw_json(self, json_line: str) -> None: - """Write a raw JSON line directly (from benchmark binary output).""" - if self._results_file: - self._results_file.write(json_line.strip() + "\n") + """Write a raw JSON line directly (from benchmark binary output). + + Non-JSON lines (e.g. DuckDB ASCII table output) are silently skipped. + """ + line = json_line.strip() + if self._results_file and line.startswith("{"): + self._results_file.write(line + "\n") self._results_file.flush() self._result_count += 1 diff --git a/benchmarks/datafusion-bench/Cargo.toml b/benchmarks/datafusion-bench/Cargo.toml index 7ac28193188..2c3ddc4bce5 100644 --- a/benchmarks/datafusion-bench/Cargo.toml +++ b/benchmarks/datafusion-bench/Cargo.toml @@ -17,6 +17,7 @@ publish = false [dependencies] anyhow = { workspace = true } arrow-ipc.workspace = true +async-trait = { workspace = true } clap = { workspace = true, features = ["derive"] } custom-labels = { workspace = true } datafusion = { workspace = true, features = [ @@ -36,6 +37,7 @@ opentelemetry_sdk.workspace = true parking_lot = { workspace = true } tokio = { workspace = true, features = ["full"] } url = { workspace = true } +vortex = { workspace = true, features = ["object_store", "files", "tokio"] } vortex-bench = { workspace = true } vortex-cuda = { workspace = true, optional = true } vortex-datafusion = { workspace = true } diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index 4ee078dd2dd..f915c9c7d3e 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -26,6 +26,7 @@ use datafusion_physical_plan::collect; use futures::StreamExt; use parking_lot::Mutex; use tokio::fs::File; +use vortex::scan::api::DataSourceRef; use vortex_bench::Benchmark; use vortex_bench::BenchmarkArg; use vortex_bench::CompactionStrategy; @@ -33,6 +34,7 @@ use vortex_bench::Engine; use vortex_bench::Format; use vortex_bench::Opt; use vortex_bench::Opts; +use vortex_bench::SESSION; use vortex_bench::conversions::convert_parquet_directory_to_vortex; use vortex_bench::create_benchmark; use vortex_bench::create_output_writer; @@ -220,6 +222,10 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +fn use_scan_api() -> bool { + std::env::var("VORTEX_USE_SCAN_API").is_ok_and(|v| v == "1") +} + async fn register_benchmark_tables( session: &SessionContext, benchmark: &B, @@ -227,6 +233,9 @@ async fn register_benchmark_tables( ) -> anyhow::Result<()> { match format { Format::Arrow => register_arrow_tables(session, benchmark).await, + _ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => { + register_v2_tables(session, benchmark, format).await + } _ => { let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; let file_format = format_to_df_format(format); @@ -265,6 +274,54 @@ async fn register_benchmark_tables( } } +/// Register tables using the V2 `VortexTable` + `MultiFileDataSource` path. +async fn register_v2_tables( + session: &SessionContext, + benchmark: &B, + format: Format, +) -> anyhow::Result<()> { + use vortex::file::multi::MultiFileDataSource; + use vortex::io::object_store::ObjectStoreFileSystem; + use vortex::io::session::RuntimeSessionExt; + use vortex::scan::api::DataSource as _; + use vortex_datafusion::v2::VortexTable; + + let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; + + for table in benchmark.table_specs().iter() { + let pattern = benchmark.pattern(table.name, format); + let table_url = ListingTableUrl::try_new(benchmark_base.clone(), pattern.clone())?; + let store = session + .state() + .runtime_env() + .object_store(table_url.object_store())?; + + let fs: vortex::io::filesystem::FileSystemRef = + Arc::new(ObjectStoreFileSystem::new(store.clone(), SESSION.handle())); + let base_prefix = benchmark_base.path().trim_start_matches('/').to_string(); + let fs = fs.with_prefix(base_prefix); + + let glob_pattern = match &pattern { + Some(p) => p.as_str().to_string(), + None => format!("*.{}", format.ext()), + }; + + let multi_ds = MultiFileDataSource::new(SESSION.clone()) + .with_filesystem(fs) + .with_glob(glob_pattern) + .build() + .await?; + + let arrow_schema = Arc::new(multi_ds.dtype().to_arrow_schema()?); + let data_source: DataSourceRef = Arc::new(multi_ds); + + let table_provider = Arc::new(VortexTable::new(data_source, SESSION.clone(), arrow_schema)); + session.register_table(table.name, table_provider)?; + } + + Ok(()) +} + /// Load Arrow IPC files into in-memory DataFusion tables. async fn register_arrow_tables( session: &SessionContext, diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index f25682038c8..96943f8064d 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -3,6 +3,7 @@ mod validation; +use std::ops::Deref; use std::path::PathBuf; use clap::Parser; @@ -68,6 +69,10 @@ struct Args { #[arg(long = "opt", value_delimiter = ',', value_parser = value_parser!(Opt))] options: Vec, + + /// Print EXPLAIN output for each query instead of running benchmarks. + #[arg(long, default_value_t = false)] + explain: bool, } fn main() -> anyhow::Result<()> { @@ -126,6 +131,33 @@ fn main() -> anyhow::Result<()> { })?; } + if args.explain { + for format in &args.formats { + let ctx = DuckClient::new( + &*benchmark, + *format, + args.delete_duckdb_database, + args.threads, + )?; + ctx.register_tables(&*benchmark, *format)?; + + for (query_idx, query) in &filtered_queries { + println!("=== Q{query_idx} [{format}] ==="); + println!("{query}"); + println!(); + let result = ctx.connection.query(&format!("EXPLAIN {query}"))?; + for chunk in result { + let chunk_str = + String::try_from(chunk.deref()).unwrap_or_else(|_| "".to_string()); + println!("{chunk_str}"); + } + println!(); + } + } + + return Ok(()); + } + let mut runner = SqlBenchmarkRunner::new( &*benchmark, Engine::DuckDB, diff --git a/docs/developer-guide/integrations/duckdb.md b/docs/developer-guide/integrations/duckdb.md index 398da841854..060f5fca973 100644 --- a/docs/developer-guide/integrations/duckdb.md +++ b/docs/developer-guide/integrations/duckdb.md @@ -17,10 +17,9 @@ concurrently during the global initialization phase, with a concurrency limit pr the number of DuckDB worker threads to keep the I/O pipeline saturated without overwhelming the system. -A `MultiScan` stream manages the set of active file scans. It prioritises completing in-progress -scans before opening new files, ensuring that DuckDB's execution threads always have data to -consume while background I/O proceeds. File footers are cached to avoid redundant parsing when -the same file appears in multiple queries. +Active file scans are driven concurrently via `try_flatten_unordered`, ensuring that DuckDB's +execution threads always have data to consume while background I/O proceeds. File footers are +cached to avoid redundant parsing when the same file appears in multiple queries. ## Threading Model diff --git a/vortex-array/src/expr/stats/precision.rs b/vortex-array/src/expr/stats/precision.rs index 673bcc026e4..671401f1936 100644 --- a/vortex-array/src/expr/stats/precision.rs +++ b/vortex-array/src/expr/stats/precision.rs @@ -21,6 +21,8 @@ use crate::scalar::ScalarValue; /// This is statistic specific, for max this will be an upper bound. Meaning that the actual max /// in an array is guaranteed to be less than or equal to the inexact value, but equal to the exact /// value. +/// +// TODO(ngates): should we model Unknown as a variant of Precision? Or have Option>? #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum Precision { Exact(T), diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index e50363a016d..8a9583105fc 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -31,7 +31,6 @@ datafusion-physical-plan = { workspace = true } datafusion-pruning = { workspace = true } futures = { workspace = true } itertools = { workspace = true } -moka = { workspace = true, features = ["future"] } object_store = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } tokio-stream = { workspace = true } diff --git a/vortex-datafusion/public-api.lock b/vortex-datafusion/public-api.lock index 45691c15b77..0dd1d1d14b6 100644 --- a/vortex-datafusion/public-api.lock +++ b/vortex-datafusion/public-api.lock @@ -18,12 +18,76 @@ pub type vortex_datafusion::metrics::VortexMetricsFinder::Error = core::convert: pub fn vortex_datafusion::metrics::VortexMetricsFinder::pre_visit(&mut self, plan: &dyn datafusion_physical_plan::execution_plan::ExecutionPlan) -> core::result::Result +pub mod vortex_datafusion::v2 + +pub struct vortex_datafusion::v2::VortexDataSource + +impl vortex_datafusion::v2::VortexDataSource + +pub fn vortex_datafusion::v2::VortexDataSource::builder(data_source: vortex_scan::api::DataSourceRef, session: vortex_session::VortexSession) -> vortex_datafusion::v2::source::VortexDataSourceBuilder + +impl core::clone::Clone for vortex_datafusion::v2::VortexDataSource + +pub fn vortex_datafusion::v2::VortexDataSource::clone(&self) -> vortex_datafusion::v2::VortexDataSource + +impl core::fmt::Debug for vortex_datafusion::v2::VortexDataSource + +pub fn vortex_datafusion::v2::VortexDataSource::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl datafusion_datasource::source::DataSource for vortex_datafusion::v2::VortexDataSource + +pub fn vortex_datafusion::v2::VortexDataSource::as_any(&self) -> &dyn core::any::Any + +pub fn vortex_datafusion::v2::VortexDataSource::eq_properties(&self) -> datafusion_physical_expr::equivalence::properties::EquivalenceProperties + +pub fn vortex_datafusion::v2::VortexDataSource::fetch(&self) -> core::option::Option + +pub fn vortex_datafusion::v2::VortexDataSource::fmt_as(&self, _t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +pub fn vortex_datafusion::v2::VortexDataSource::open(&self, partition: usize, _context: alloc::sync::Arc) -> datafusion_common::error::Result + +pub fn vortex_datafusion::v2::VortexDataSource::output_partitioning(&self) -> datafusion_physical_expr::partitioning::Partitioning + +pub fn vortex_datafusion::v2::VortexDataSource::partition_statistics(&self, _partition: core::option::Option) -> datafusion_common::error::Result + +pub fn vortex_datafusion::v2::VortexDataSource::repartitioned(&self, target_partitions: usize, _repartition_file_min_size: usize, output_ordering: core::option::Option) -> datafusion_common::error::Result>> + +pub fn vortex_datafusion::v2::VortexDataSource::try_pushdown_filters(&self, filters: alloc::vec::Vec>, _config: &datafusion_common::config::ConfigOptions) -> datafusion_common::error::Result>> + +pub fn vortex_datafusion::v2::VortexDataSource::try_swapping_with_projection(&self, projection: &datafusion_physical_expr::projection::ProjectionExprs) -> datafusion_common::error::Result>> + +pub fn vortex_datafusion::v2::VortexDataSource::with_fetch(&self, limit: core::option::Option) -> core::option::Option> + +pub struct vortex_datafusion::v2::VortexTable + +impl vortex_datafusion::v2::VortexTable + +pub fn vortex_datafusion::v2::VortexTable::new(data_source: vortex_scan::api::DataSourceRef, session: vortex_session::VortexSession, arrow_schema: arrow_schema::schema::SchemaRef) -> Self + +impl core::fmt::Debug for vortex_datafusion::v2::VortexTable + +pub fn vortex_datafusion::v2::VortexTable::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result + +impl datafusion_catalog::table::TableProvider for vortex_datafusion::v2::VortexTable + +pub fn vortex_datafusion::v2::VortexTable::as_any(&self) -> &dyn core::any::Any + +pub fn vortex_datafusion::v2::VortexTable::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, projection: core::option::Option<&'life2 alloc::vec::Vec>, _filters: &'life3 [datafusion_expr::expr::Expr], _limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait + +pub fn vortex_datafusion::v2::VortexTable::schema(&self) -> arrow_schema::schema::SchemaRef + +pub fn vortex_datafusion::v2::VortexTable::statistics(&self) -> core::option::Option + +pub fn vortex_datafusion::v2::VortexTable::table_type(&self) -> datafusion_expr::table_source::TableType + pub struct vortex_datafusion::VortexAccessPlan impl vortex_datafusion::VortexAccessPlan pub fn vortex_datafusion::VortexAccessPlan::apply_to_builder(&self, scan_builder: vortex_scan::scan_builder::ScanBuilder) -> vortex_scan::scan_builder::ScanBuilder where A: 'static + core::marker::Send +pub fn vortex_datafusion::VortexAccessPlan::selection(&self) -> core::option::Option<&vortex_scan::selection::Selection> + impl vortex_datafusion::VortexAccessPlan pub fn vortex_datafusion::VortexAccessPlan::with_selection(self, selection: vortex_scan::selection::Selection) -> Self diff --git a/vortex-datafusion/src/convert/mod.rs b/vortex-datafusion/src/convert/mod.rs index 84fcc333086..050987522b7 100644 --- a/vortex-datafusion/src/convert/mod.rs +++ b/vortex-datafusion/src/convert/mod.rs @@ -6,6 +6,7 @@ use vortex::error::VortexResult; pub(crate) mod exprs; mod scalars; pub(crate) mod schema; +pub(crate) mod stats; /// First-party trait for implementing conversion from DataFusion types to Vortex types. pub(crate) trait FromDataFusion: Sized { diff --git a/vortex-datafusion/src/convert/stats.rs b/vortex-datafusion/src/convert/stats.rs new file mode 100644 index 00000000000..2a6b1d0996f --- /dev/null +++ b/vortex-datafusion/src/convert/stats.rs @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use datafusion_common::ColumnStatistics; +use datafusion_common::stats::Precision; +use vortex::array::stats::StatsSet; +use vortex::dtype::DType; +use vortex::dtype::Nullability; +use vortex::dtype::PType; +use vortex::error::VortexExpect; +use vortex::error::VortexResult; +use vortex::expr::stats::Stat; +use vortex::scalar::Scalar; + +use crate::PrecisionExt; +use crate::convert::TryToDataFusion; + +/// Convert a stats set for an array with the given dtype. +pub(crate) fn stats_set_to_df( + stats_set: &StatsSet, + dtype: &DType, +) -> VortexResult { + // Update the total size in bytes. + let column_size = stats_set.get_as::(Stat::UncompressedSizeInBytes, &PType::U64.into()); + + // TODO(connor): There's a lot that can go wrong here, should probably handle this + // more gracefully... + // Find the min statistic. + let min = stats_set.get(Stat::Min).and_then(|pstat_val| { + pstat_val + .map(|stat_val| { + Scalar::try_new( + Stat::Min + .dtype(dtype) + .vortex_expect("must have a valid dtype"), + Some(stat_val), + ) + .vortex_expect("`Stat::Min` somehow had an incompatible `DType`") + .try_to_df() + .ok() + }) + .transpose() + }); + + // Find the max statistic. + let max = stats_set.get(Stat::Max).and_then(|pstat_val| { + pstat_val + .map(|stat_val| { + Scalar::try_new( + Stat::Max + .dtype(dtype) + .vortex_expect("must have a valid dtype"), + Some(stat_val), + ) + .vortex_expect("`Stat::Max` somehow had an incompatible `DType`") + .try_to_df() + .ok() + }) + .transpose() + }); + + // Find the sum statistic + let sum = stats_set.get(Stat::Sum).and_then(|pstat_val| { + pstat_val + .map(|stat_val| { + Scalar::try_new( + Stat::Sum + .dtype(dtype) + .vortex_expect("must have a valid dtype"), + Some(stat_val), + ) + .vortex_expect("`Stat::Sum` somehow had an incompatible `DType`") + .try_to_df() + .ok() + }) + .transpose() + }); + + let null_count = stats_set.get_as::(Stat::NullCount, &PType::U64.into()); + + Ok(ColumnStatistics { + null_count: null_count.to_df(), + min_value: min.to_df(), + max_value: max.to_df(), + sum_value: sum.to_df(), + distinct_count: stats_set + .get_as::(Stat::IsConstant, &DType::Bool(Nullability::NonNullable)) + .and_then(|is_constant| is_constant.as_exact().map(|_| Precision::Exact(1))) + .unwrap_or(Precision::Absent), + byte_size: column_size.to_df(), + }) +} diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index e8ad53d730a..8e7fc57dbf5 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -10,6 +10,7 @@ use vortex::expr::stats::Precision; mod convert; mod persistent; +pub mod v2; #[cfg(test)] mod tests; diff --git a/vortex-datafusion/src/persistent/access_plan.rs b/vortex-datafusion/src/persistent/access_plan.rs index 2a523ce8e16..9cf4a5824d6 100644 --- a/vortex-datafusion/src/persistent/access_plan.rs +++ b/vortex-datafusion/src/persistent/access_plan.rs @@ -23,6 +23,11 @@ impl VortexAccessPlan { } impl VortexAccessPlan { + /// Returns the selection, if one was set. + pub fn selection(&self) -> Option<&Selection> { + self.selection.as_ref() + } + /// Apply the plan to the scan's builder. pub fn apply_to_builder(&self, mut scan_builder: ScanBuilder) -> ScanBuilder where diff --git a/vortex-datafusion/src/v2/mod.rs b/vortex-datafusion/src/v2/mod.rs new file mode 100644 index 00000000000..1e385ca3c85 --- /dev/null +++ b/vortex-datafusion/src/v2/mod.rs @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! An experimental implementation of the Vortex Scan API for DataFusion. +//! +//! This integration directly implements `TableProvider` + `ExecutionPlan`, bypassing DataFusion's +//! `FileFormat` abstraction. + +mod source; +mod table; + +pub use source::VortexDataSource; +pub use table::VortexTable; diff --git a/vortex-datafusion/src/v2/source.rs b/vortex-datafusion/src/v2/source.rs new file mode 100644 index 00000000000..7033289f075 --- /dev/null +++ b/vortex-datafusion/src/v2/source.rs @@ -0,0 +1,552 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`VortexDataSource`] implements DataFusion's [`DataSource`] trait, deferring scan construction +//! to [`DataSource::open`] so that pushed-down filters and limits are included in the +//! [`ScanRequest`]. A single DataFusion partition is used; Vortex handles internal parallelism +//! by driving splits concurrently via [`TryStreamExt::try_flatten_unordered`]. + +use std::any::Any; +use std::fmt; +use std::fmt::Formatter; +use std::num::NonZero; +use std::num::NonZeroUsize; +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::Schema; +use arrow_schema::SchemaRef; +use datafusion_common::ColumnStatistics; +use datafusion_common::DataFusionError; +use datafusion_common::Result as DFResult; +use datafusion_common::Statistics; +use datafusion_common::stats::Precision as DFPrecision; +use datafusion_datasource::source::DataSource; +use datafusion_execution::SendableRecordBatchStream; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::utils::reassign_expr_columns; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::DisplayFormatType; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; +use datafusion_physical_plan::filter_pushdown::PushedDown; +use datafusion_physical_plan::stream::RecordBatchStreamAdapter; +use futures::StreamExt; +use futures::TryStreamExt; +use futures::future::try_join_all; +use vortex::array::VortexSessionExecute; +use vortex::array::arrow::ArrowArrayExecutor; +use vortex::dtype::DType; +use vortex::dtype::FieldPath; +use vortex::dtype::Nullability; +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::expr::Expression; +use vortex::expr::and as vx_and; +use vortex::expr::get_item; +use vortex::expr::pack; +use vortex::expr::root; +use vortex::expr::stats::Precision; +use vortex::expr::transform::replace; +use vortex::io::session::RuntimeSessionExt; +use vortex::scan::api::DataSourceRef; +use vortex::scan::api::ScanRequest; +use vortex::session::VortexSession; + +use crate::convert::exprs::DefaultExpressionConvertor; +use crate::convert::exprs::ExpressionConvertor; +use crate::convert::exprs::ProcessedProjection; +use crate::convert::exprs::make_vortex_predicate; +use crate::convert::stats::stats_set_to_df; + +/// A builder for a [`VortexDataSource`]. +pub struct VortexDataSourceBuilder { + data_source: DataSourceRef, + session: VortexSession, + + arrow_schema: Option, + projection: Option>, +} + +impl VortexDataSourceBuilder { + /// Manually configure an Arrow schema to use when reading from the Vortex source. + /// If not specified, the data source will infer an Arrow schema from the Vortex DType. + /// + /// Note that this schema is not validated against the Vortex DType so any errors will be + /// deferred until read time. + pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self { + self.arrow_schema = Some(arrow_schema); + self + } + + /// Configure an initial projection using top-level field indices. + pub fn with_projection(mut self, indices: Vec) -> Self { + self.projection = Some(indices); + self + } + + /// Configure an initial projection using top-level field indices. + pub fn with_some_projection(mut self, indices: Option>) -> Self { + self.projection = indices; + self + } + + /// Build the [`VortexDataSource`]. + /// + /// FIXME(ngates): Note that due to the DataFusion API, this function eagerly resolves + /// statistics for all projected columns. That said.. we only need to do this for aggregation + /// reductions. Any stats used for pruning are handled internally. We could possibly look + /// at the plan ourselves and decide whether there is any need for the stats? + pub async fn build(self) -> VortexResult { + // The projection expression + let mut projection = root(); + + // Resolve the Arrow schema + let mut arrow_schema = match self.arrow_schema { + Some(schema) => schema, + None => { + let data_type = self.data_source.dtype().to_arrow_dtype()?; + let DataType::Struct(fields) = data_type else { + vortex_bail!("Expected a struct-like DataType, found {}", data_type); + }; + Arc::new(Schema::new(fields)) + } + }; + + // Apply any selection and create a projection expression. + if let Some(indices) = self.projection { + let fields = indices.iter().map(|&i| { + let name = arrow_schema.field(i).name().clone(); + let expr = get_item(name.as_str(), root()); + (name, expr) + }); + + // Update the projection expression + projection = pack(fields, Nullability::NonNullable); + + // Update the arrow schema + arrow_schema = Arc::new(Schema::new( + indices + .iter() + .map(|&i| arrow_schema.field(i).clone()) + .collect::>(), + )); + } + + let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else { + vortex_bail!("Projection does not evaluate to a struct"); + }; + + // We now compute initial statistics. + let field_paths: Vec<_> = fields + .names() + .iter() + .cloned() + .map(FieldPath::from_name) + .collect(); + let statistics = try_join_all( + field_paths + .iter() + .map(|path| self.data_source.field_statistics(path)), + ) + .await? + .iter() + .zip(fields.fields()) + .map(|(stats, dtype)| stats_set_to_df(stats, &dtype)) + .collect::>>()?; + + Ok(VortexDataSource { + data_source: self.data_source, + session: self.session, + initial_schema: arrow_schema.clone(), + initial_projection: projection.clone(), + initial_statistics: statistics.clone(), + projected_projection: projection.clone(), + projected_schema: arrow_schema.clone(), + projected_statistics: statistics.clone(), + leftover_projection: None, + leftover_schema: arrow_schema, + leftover_statistics: statistics, + filter: None, + limit: None, + ordered: false, + num_partitions: std::thread::available_parallelism() + .unwrap_or(unsafe { NonZero::new_unchecked(1) }), + }) + } +} + +impl VortexDataSource { + /// Create a builder for a [`VortexDataSource`]. + pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder { + VortexDataSourceBuilder { + data_source, + session, + arrow_schema: None, + projection: None, + } + } +} + +/// A DataFusion [`DataSource`] that defers Vortex scan construction to [`open`](DataSource::open). +/// +/// Holds a [`DataSourceRef`] rather than pre-collected splits, so that filters and limits pushed +/// down by DataFusion's optimizer are included in the [`ScanRequest`]. A single DataFusion +/// partition is exposed; Vortex drives splits concurrently via +/// [`TryStreamExt::try_flatten_unordered`]. +#[derive(Clone)] +pub struct VortexDataSource { + /// The Vortex data source. + data_source: DataSourceRef, + /// Vortex session handle. + session: VortexSession, + + // --- Phase 1: Initial (from the builder, before any optimizer pushdown) --- + /// The Arrow schema of the data source before any DataFusion projection pushdown. + initial_schema: SchemaRef, + /// The initial Vortex projection expression (e.g. column selection from the builder). + initial_projection: Expression, + /// Column statistics for the initial projection columns. + #[allow(dead_code)] + initial_statistics: Vec, + + // --- Phase 2: Projected (pushed into the Vortex scan) --- + /// The Vortex projection expression sent in the [`ScanRequest`]. + /// Composed with `initial_projection` so it operates on the original source columns. + projected_projection: Expression, + /// The Arrow schema of the Vortex scan output (before any leftover projection). + projected_schema: SchemaRef, + /// Column statistics for the projected (scan output) columns. + projected_statistics: Vec, + + // --- Phase 3: Leftover (applied by DataFusion after the scan) --- + /// DataFusion projection expressions that could not be pushed into the Vortex scan. + /// Applied after converting arrays to record batches in [`DataSource::open`]. + /// `None` when all projection expressions were successfully pushed down. + leftover_projection: Option, + /// The Arrow schema after applying the leftover projection. + /// This is the output schema seen by DataFusion. + leftover_schema: SchemaRef, + /// Column statistics matching `leftover_schema`. + leftover_statistics: Vec, + + /// An optional filter expression. + /// Populated by [`DataSource::try_pushdown_filters`] when DataFusion pushes filters down. + filter: Option, + /// An optional row limit populated by [`DataSource::with_fetch`]. + limit: Option, + /// Whether to preserve the order of the output rows. + ordered: bool, + + /// The requested partition count from DataFusion, populated by [`DataSource::repartitioned`]. + /// We use this as a hint for how many splits to execute concurrently in `open()`, but we + /// always declare to DataFusion that we only have a single partition so that we can + /// internally manage concurrency and fix the problem of partition skew. + num_partitions: NonZeroUsize, +} + +impl fmt::Debug for VortexDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("VortexScanSource") + .field("schema", &self.leftover_schema) + .field("projection", &format!("{}", &self.projected_projection)) + .field("filter", &self.filter.as_ref().map(|e| format!("{}", e))) + .field("limit", &self.limit) + .finish() + } +} + +impl DataSource for VortexDataSource { + fn open( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + // VortexScanSource always uses a single partition since Vortex handles parallelism + // and concurrency internally. + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "VortexScanSource: expected partition 0, got {partition}" + ))); + } + + // Build the scan request with pushed-down projection, filter, and limit. + // The projection is included so the scan can prune columns at the I/O level. + let scan_request = ScanRequest { + projection: Some(self.projected_projection.clone()), + filter: self.filter.clone(), + limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)), + ordered: self.ordered, + ..Default::default() + }; + + let data_source = self.data_source.clone(); + let projected_schema = self.projected_schema.clone(); + let session = self.session.clone(); + let num_partitions = self.num_partitions; + + // Pre-build the leftover projector (if any) so we can apply it after batch conversion. + let leftover_projector = self + .leftover_projection + .as_ref() + .map(|proj| proj.make_projector(&self.projected_schema)) + .transpose()?; + + // Defer the async DataSource::scan() call to the first poll of the stream. + let stream = futures::stream::once(async move { + let scan = data_source + .scan(scan_request) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // Each split.execute() returns a lazy stream whose early polls do preparation + // work (expression resolution, layout traversal, first I/O spawns). We use + // try_flatten_unordered to poll multiple split streams concurrently so that + // the next split is already warm when the current one finishes. + let scan_streams = scan.partitions().map(|split_result| { + let split = split_result?; + split.execute() + }); + + let handle = session.handle(); + let stream = scan_streams + .try_flatten_unordered(Some(num_partitions.get() * 2)) + .map(move |result| { + let session = session.clone(); + let schema = projected_schema.clone(); + handle.spawn_cpu(move || { + let mut ctx = session.create_execution_ctx(); + result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx)) + }) + }) + .buffered(num_partitions.get()) + .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e)))); + + // Apply leftover projection (expressions that couldn't be pushed into Vortex). + let stream = if let Some(projector) = leftover_projector { + stream + .map(move |batch_result| { + batch_result.and_then(|batch| projector.project_batch(&batch)) + }) + .boxed() + } else { + stream.boxed() + }; + + Ok::<_, DataFusionError>(stream) + }) + .try_flatten(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.leftover_schema.clone(), + stream, + ))) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!( + f, + "VortexScanSource: projection={}", + self.projected_projection + )?; + if let Some(ref filter) = self.filter { + write!(f, ", filter={filter}")?; + } + if let Some(limit) = self.limit { + write!(f, ", limit={limit}")?; + } + Ok(()) + } + + fn repartitioned( + &self, + target_partitions: usize, + _repartition_file_min_size: usize, + output_ordering: Option, + ) -> DFResult>> { + // Vortex handles parallelism internally — always use a single partition. + let mut this = self.clone(); + this.num_partitions = NonZero::new(target_partitions) + .ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?; + this.ordered |= output_ordering.is_some(); + Ok(Some(Arc::new(this))) + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn eq_properties(&self) -> EquivalenceProperties { + EquivalenceProperties::new(self.leftover_schema.clone()) + } + + fn partition_statistics(&self, _partition: Option) -> DFResult { + // FIXME(ngates): this should be adjusted based on filters. See DuckDB for heuristics, + // and in the future, store the selectivity stats in the session. + let num_rows = estimate_to_df_precision(&self.data_source.row_count()); + + // FIXME(ngates): byte size should be adjusted for the initial projection... + let total_byte_size = estimate_to_df_precision(&self.data_source.byte_size()); + + // Column statistics must match the output schema (leftover_schema), which may differ + // from the initial schema after try_swapping_with_projection adds computed columns. + let column_statistics = self.leftover_statistics.clone(); + + Ok(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) + } + + fn with_fetch(&self, limit: Option) -> Option> { + let mut this = self.clone(); + this.limit = limit; + Some(Arc::new(this)) + } + + fn fetch(&self) -> Option { + self.limit + } + + // Note that we're explicitly "swapping" the projection. That means everything we do must + // be computed over the original input schema, rather than the projected output schema. + fn try_swapping_with_projection( + &self, + projection: &ProjectionExprs, + ) -> DFResult>> { + tracing::debug!( + "VortexScanSource: trying to swap with projection: {}", + projection + ); + + let convertor = DefaultExpressionConvertor::default(); + let input_schema = self.initial_schema.as_ref(); + let projected_schema = projection.project_schema(input_schema)?; + + // Use the shared ExpressionConvertor to split the projection into a Vortex + // scan_projection and a leftover DataFusion projection for expressions that + // can't be pushed down (e.g., unsupported scalar functions, decimal binary). + let ProcessedProjection { + scan_projection, + leftover_projection, + } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?; + + // Compose with the initial projection so the scan operates on the original + // source columns, not the initial projection's output columns. + let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone()); + + // Compute the scan output schema from the Vortex expression's return dtype. + let scan_dtype = scan_projection + .return_dtype(self.data_source.dtype()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let scan_arrow_type = scan_dtype + .to_arrow_dtype() + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let DataType::Struct(scan_fields) = scan_arrow_type else { + return Err(DataFusionError::Internal( + "Scan projection must produce a struct type".to_string(), + )); + }; + let scan_output_schema = Arc::new(Schema::new(scan_fields)); + + // Remap the leftover column references to match the scan output schema. + let leftover_projection = leftover_projection + .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?; + + let final_schema = Arc::new(projected_schema); + + let mut this = self.clone(); + this.projected_projection = scan_projection; + this.projected_schema = scan_output_schema.clone(); + this.projected_statistics = + vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()]; + this.leftover_projection = Some(leftover_projection); + this.leftover_schema = final_schema.clone(); + this.leftover_statistics = + vec![ColumnStatistics::new_unknown(); final_schema.fields().len()]; + + Ok(Some(Arc::new(this))) + } + + fn try_pushdown_filters( + &self, + filters: Vec>, + _config: &datafusion_common::config::ConfigOptions, + ) -> DFResult>> { + if filters.is_empty() { + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![], + )); + } + + let convertor = DefaultExpressionConvertor::default(); + let input_schema = self.initial_schema.as_ref(); + + // Classify each filter: pushable filters are passed into the ScanRequest in open(), + // so we can safely claim PushedDown::Yes for them. + let pushdown_results: Vec = filters + .iter() + .map(|expr| { + if convertor.can_be_pushed_down(expr, input_schema) { + PushedDown::Yes + } else { + PushedDown::No + } + }) + .collect(); + + // If nothing can be pushed down, return early. + if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) { + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + pushdown_results, + )); + } + + // Collect the pushable filter expressions. + let pushable: Vec> = filters + .iter() + .zip(pushdown_results.iter()) + .filter_map(|(expr, pushed)| match pushed { + PushedDown::Yes => Some(expr.clone()), + PushedDown::No => None, + }) + .collect(); + + // Convert to Vortex conjunction. + let vortex_pred = make_vortex_predicate(&convertor, &pushable)?; + + // Combine with existing filter. + let new_filter = match (&self.filter, vortex_pred) { + (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)), + (Some(existing), None) => Some(existing.clone()), + (None, Some(new_pred)) => Some(new_pred), + (None, None) => None, + }; + + let mut this = self.clone(); + this.filter = new_filter; + Ok( + FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results) + .with_updated_node(Arc::new(this) as _), + ) + } +} + +/// Convert a Vortex [`Option`] to a DataFusion [`Precision`](DFPrecision). +fn estimate_to_df_precision(est: &Option>) -> DFPrecision { + match est { + Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)), + Some(Precision::Inexact(v)) => { + DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX)) + } + None => DFPrecision::Absent, + } +} diff --git a/vortex-datafusion/src/v2/table.rs b/vortex-datafusion/src/v2/table.rs new file mode 100644 index 00000000000..5583a6fa270 --- /dev/null +++ b/vortex-datafusion/src/v2/table.rs @@ -0,0 +1,135 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! [`VortexTable`] implements DataFusion's [`TableProvider`] trait, providing a direct +//! integration between a Vortex [`DataSource`] and DataFusion's query engine. + +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion_catalog::Session; +use datafusion_catalog::TableProvider; +use datafusion_common::ColumnStatistics; +use datafusion_common::DataFusionError; +use datafusion_common::Result as DFResult; +use datafusion_common::Statistics; +use datafusion_common::stats::Precision; +use datafusion_datasource::source::DataSourceExec; +use datafusion_expr::Expr; +use datafusion_expr::TableType; +use datafusion_physical_plan::ExecutionPlan; +use vortex::scan::api::DataSourceRef; +use vortex::session::VortexSession; + +use crate::v2::source::VortexDataSource; + +/// A DataFusion [`TableProvider`] backed by a Vortex [`DataSourceRef`]. +pub struct VortexTable { + data_source: DataSourceRef, + session: VortexSession, + arrow_schema: SchemaRef, +} + +impl fmt::Debug for VortexTable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("VortexTable") + .field("schema", &self.arrow_schema) + .finish() + } +} + +impl VortexTable { + /// Creates a new [`VortexTable`] from a Vortex data source and session. + /// + /// The Arrow schema will be used to emit the correct column names and types to DataFusion. + /// The Vortex DType of the data source should be compatible with this Arrow schema. + pub fn new( + data_source: DataSourceRef, + session: VortexSession, + arrow_schema: SchemaRef, + ) -> Self { + Self { + data_source, + session, + arrow_schema, + } + } +} + +#[async_trait] +impl TableProvider for VortexTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.arrow_schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + // Construct the physical node representing this table. + let data_source = VortexDataSource::builder(self.data_source.clone(), self.session.clone()) + .with_arrow_schema(self.arrow_schema.clone()) + // We push down the projection now since it can make building the physical plan a lot + // cheaper, e.g. by only computing stats for the projected columns. + .with_some_projection(projection.cloned()) + // We don't push down filters for two reasons: + // 1. Vortex requires a physical expression, not logical. DataFusion will try to push + // the physical filters later. + // 2. There's nothing useful we can do with filters now to reduce the amount of work + // we have to do. + // + // We also don't push down the limit for the same reason, there's nothing useful we + // can do with it. + .build() + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + Ok(DataSourceExec::from_data_source(data_source)) + } + + /// Returns statistics for the full table, prior to any projection. + /// + /// We should not (and actually, cannot) perform I/O here, so the best we can do is return + /// cardinality and byte size estimates. + /// + // NOTE(ngates): it's not obvious these are actually used? I think DataFusion does join + // planning over stats from the physical plan? + fn statistics(&self) -> Option { + let num_rows = match self.data_source.row_count() { + Some(vortex::expr::stats::Precision::Exact(v)) => { + usize::try_from(v).map(Precision::Exact).unwrap_or_default() + } + _ => Precision::Absent, + }; + + let total_byte_size = match self.data_source.byte_size() { + Some(vortex::expr::stats::Precision::Exact(v)) => { + usize::try_from(v).map(Precision::Exact).unwrap_or_default() + } + _ => Precision::Absent, + }; + + let column_statistics = + vec![ColumnStatistics::new_unknown(); self.arrow_schema.fields.len()]; + + Some(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) + } +} diff --git a/vortex-duckdb/Cargo.toml b/vortex-duckdb/Cargo.toml index b8f0e97a629..d84096e45d0 100644 --- a/vortex-duckdb/Cargo.toml +++ b/vortex-duckdb/Cargo.toml @@ -30,6 +30,7 @@ async-trait = { workspace = true } bitvec = { workspace = true } custom-labels = { workspace = true } futures = { workspace = true } +glob = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } object_store = { workspace = true, features = ["aws"] } diff --git a/vortex-duckdb/src/duckdb/table_function/init.rs b/vortex-duckdb/src/duckdb/table_function/init.rs index 7d72bf10d65..c8a143a98e2 100644 --- a/vortex-duckdb/src/duckdb/table_function/init.rs +++ b/vortex-duckdb/src/duckdb/table_function/init.rs @@ -75,7 +75,7 @@ impl Debug for TableInitInput<'_, T> { .field("table_function", &std::any::type_name::()) .field("column_ids", &self.column_ids()) .field("projection_ids", &self.projection_ids()) - // .field("table_filter_set", &self.table_filter_set()) + .field("table_filter_set", &self.table_filter_set()) .finish() } } diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index 688a7cf40d4..6f3f27f20d6 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -151,6 +151,7 @@ pub trait TableFunction: Sized + Debug { // TODO(ngates): there are many more callbacks that can be configured. } +#[derive(Debug)] pub enum Cardinality { /// Completely unknown cardinality. Unknown, diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index dfc93480de6..a00c61fe319 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -21,12 +21,14 @@ use crate::duckdb::DatabaseRef; use crate::duckdb::LogicalType; use crate::duckdb::Value; use crate::scan::VortexTableFunction; +use crate::scan_api::VortexScanApiTableFunction; mod convert; pub mod duckdb; mod exporter; mod filesystem; mod scan; +mod scan_api; #[rustfmt::skip] #[path = "./cpp.rs"] @@ -53,8 +55,13 @@ pub fn initialize(db: &DatabaseRef) -> VortexResult<()> { LogicalType::varchar(), Value::from("vortex"), )?; - db.register_table_function::(c"vortex_scan")?; - db.register_table_function::(c"read_vortex")?; + if std::env::var("VORTEX_USE_SCAN_API").is_ok_and(|v| v == "1") { + db.register_table_function::(c"vortex_scan")?; + db.register_table_function::(c"read_vortex")?; + } else { + db.register_table_function::(c"vortex_scan")?; + db.register_table_function::(c"read_vortex")?; + } db.register_copy_function::(c"vortex", c"vortex") } diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index fa8aaade15a..58e3a90fd4c 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -118,12 +118,101 @@ pub struct VortexGlobalData { } impl VortexGlobalData { + pub(crate) fn new( + iterator: ThreadSafeIterator)>>, + ) -> Self { + Self { + iterator, + batch_id: AtomicU64::new(0), + ctx: ExecutionCtx::new(VortexSession::default()), + bytes_total: Arc::new(AtomicU64::new(0)), + bytes_read: AtomicU64::new(0), + } + } + pub fn progress(&self) -> f64 { let read = self.bytes_read.load(Ordering::Relaxed); let mut total = self.bytes_total.load(Ordering::Relaxed); total += (total == 0) as u64; read as f64 / total as f64 * 100. // return 100. when nothing is read } + + pub(crate) fn new_local(&self) -> VortexLocalData { + VortexLocalData { + iterator: self.iterator.clone(), + exporter: None, + batch_id: None, + } + } + + /// Shared scan logic: pulls arrays from the thread-safe iterator, converts them to struct + /// arrays, and exports them into DuckDB data chunks. + pub(crate) fn scan( + &mut self, + local_state: &mut VortexLocalData, + chunk: &mut DataChunkRef, + ) -> VortexResult<()> { + loop { + if local_state.exporter.is_none() { + let Some(result) = local_state.iterator.next() else { + return Ok(()); + }; + let (array_result, conversion_cache) = result?; + + let array_result = array_result.optimize_recursive()?; + let array_result = if let Some(array) = array_result.as_opt::() { + array.clone() + } else if let Some(array) = array_result.as_opt::() + && let Some(pack_options) = array.scalar_fn().as_opt::() + { + StructArray::new( + pack_options.names.clone(), + array.children(), + array.len(), + pack_options.nullability.into(), + ) + } else { + array_result + .execute::(&mut self.ctx)? + .into_struct() + }; + + local_state.exporter = Some(ArrayExporter::try_new( + &array_result, + &conversion_cache, + &mut self.ctx, + )?); + // Relaxed since there is no intra-instruction ordering required. + local_state.batch_id = Some(self.batch_id.fetch_add(1, Ordering::Relaxed)); + } + + let exporter = local_state + .exporter + .as_mut() + .vortex_expect("error: exporter missing"); + + let has_more_data = exporter.export(chunk)?; + self.bytes_read.fetch_add(chunk.len(), Ordering::Relaxed); + + if !has_more_data { + // This exporter is fully consumed. + local_state.exporter = None; + local_state.batch_id = None; + } else { + break; + } + } + + assert!(!chunk.is_empty()); + + Ok(()) + } + + pub(crate) fn partition_data(local_state: &VortexLocalData) -> VortexResult { + local_state + .batch_id + .ok_or_else(|| vortex_err!("batch id missing, no batches exported")) + } } pub struct VortexLocalData { @@ -136,13 +225,10 @@ pub struct VortexLocalData { #[derive(Debug)] pub struct VortexTableFunction; -/// Extracts the schema from a Vortex file. -fn extract_schema_from_vortex_file( - file: &VortexFile, +/// Extracts DuckDB column names and logical types from a Vortex struct DType. +pub(crate) fn extract_schema_from_dtype( + dtype: &vortex::dtype::DType, ) -> VortexResult<(Vec, Vec)> { - let dtype = file.dtype(); - - // For now, we assume the top-level type to be a struct. let struct_dtype = dtype .as_struct_fields_opt() .ok_or_else(|| vortex_err!("Vortex file must contain a struct array at the top level"))?; @@ -218,8 +304,28 @@ fn extract_table_filter_expr( // This is used by duckdb whenever there is no projection id in a logical_get node. // For some reason we cannot return an empty DataChunk and duckdb will look for the virtual column // with this index and create a data chunk with a single vector of that type. -static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; -static EMPTY_COLUMN_NAME: &str = ""; +pub(crate) static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; +pub(crate) static EMPTY_COLUMN_NAME: &str = ""; + +/// Shared local state initialization for both `VortexTableFunction` and `VortexScanApiTableFunction`. +pub(crate) fn init_local_shared(global: &mut VortexGlobalData) -> VortexResult { + unsafe { + use custom_labels::sys; + + if sys::current().is_null() { + let ls = sys::new(0); + sys::replace(ls); + }; + } + + let global_labels = get_global_labels(); + + for (key, value) in global_labels { + CURRENT_LABELSET.set(key, value); + } + + Ok(global.new_local()) +} impl TableFunction for VortexTableFunction { type BindData = VortexBindData; @@ -321,7 +427,7 @@ impl TableFunction for VortexTableFunction { VortexResult::Ok(file) })?; - let (column_names, column_types) = extract_schema_from_vortex_file(&first_file)?; + let (column_names, column_types) = extract_schema_from_dtype(first_file.dtype())?; // Add result columns based on the extracted schema. for (column_name, column_type) in column_names.iter().zip(&column_types) { @@ -345,62 +451,7 @@ impl TableFunction for VortexTableFunction { global_state: &mut Self::GlobalState, chunk: &mut DataChunkRef, ) -> VortexResult<()> { - loop { - if local_state.exporter.is_none() { - let Some(result) = local_state.iterator.next() else { - return Ok(()); - }; - let (array_result, conversion_cache) = result?; - - let array_result = array_result.optimize_recursive()?; - let array_result = if let Some(array) = array_result.as_opt::() { - array.clone() - } else if let Some(array) = array_result.as_opt::() - && let Some(pack_options) = array.scalar_fn().as_opt::() - { - StructArray::new( - pack_options.names.clone(), - array.children(), - array.len(), - pack_options.nullability.into(), - ) - } else { - array_result - .execute::(&mut global_state.ctx)? - .into_struct() - }; - - local_state.exporter = Some(ArrayExporter::try_new( - &array_result, - &conversion_cache, - &mut global_state.ctx, - )?); - // Relaxed since there is no intra-instruction ordering required. - local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); - } - - let exporter = local_state - .exporter - .as_mut() - .vortex_expect("error: exporter missing"); - - let has_more_data = exporter.export(chunk)?; - global_state - .bytes_read - .fetch_add(chunk.len(), Ordering::Relaxed); - - if !has_more_data { - // This exporter is fully consumed. - local_state.exporter = None; - local_state.batch_id = None; - } else { - break; - } - } - - assert!(!chunk.is_empty()); - - Ok(()) + global_state.scan(local_state, chunk) } fn init_global(init_input: &TableInitInput) -> VortexResult { @@ -504,26 +555,7 @@ impl TableFunction for VortexTableFunction { _init: &TableInitInput, global: &mut Self::GlobalState, ) -> VortexResult { - unsafe { - use custom_labels::sys; - - if sys::current().is_null() { - let ls = sys::new(0); - sys::replace(ls); - }; - } - - let global_labels = get_global_labels(); - - for (key, value) in global_labels { - CURRENT_LABELSET.set(key, value); - } - - Ok(VortexLocalData { - iterator: global.iterator.clone(), - exporter: None, - batch_id: None, - }) + init_local_shared(global) } fn table_scan_progress( @@ -562,11 +594,9 @@ impl TableFunction for VortexTableFunction { fn partition_data( _bind_data: &Self::BindData, _global_init_data: &mut Self::GlobalState, - _local_init_data: &mut Self::LocalState, + local_init_data: &mut Self::LocalState, ) -> VortexResult { - _local_init_data - .batch_id - .ok_or_else(|| vortex_err!("batch id missing, no batches exported")) + VortexGlobalData::partition_data(local_init_data) } fn to_string(bind_data: &Self::BindData) -> Option> { diff --git a/vortex-duckdb/src/scan_api.rs b/vortex-duckdb/src/scan_api.rs new file mode 100644 index 00000000000..07f4baf0b91 --- /dev/null +++ b/vortex-duckdb/src/scan_api.rs @@ -0,0 +1,332 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::path::Path; +use std::sync::Arc; + +use futures::StreamExt; +use futures::TryStreamExt; +use itertools::Itertools; +use num_traits::AsPrimitive; +use url::Url; +use vortex::dtype::FieldNames; +use vortex::error::VortexExpect; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::expr::Expression; +use vortex::expr::and_collect; +use vortex::expr::col; +use vortex::expr::root; +use vortex::expr::select; +use vortex::expr::stats::Precision; +use vortex::file::multi::MultiFileDataSource; +use vortex::io::runtime::BlockingRuntime; +use vortex::scan::api::DataSourceRef; +use vortex::scan::api::ScanRequest; +use vortex_utils::aliases::hash_set::HashSet; + +use crate::RUNTIME; +use crate::SESSION; +use crate::convert::try_from_bound_expression; +use crate::convert::try_from_table_filter; +use crate::duckdb::BindInputRef; +use crate::duckdb::BindResultRef; +use crate::duckdb::Cardinality; +use crate::duckdb::ClientContextRef; +use crate::duckdb::DataChunkRef; +use crate::duckdb::ExpressionRef; +use crate::duckdb::LogicalType; +use crate::duckdb::TableFunction; +use crate::duckdb::TableInitInput; +use crate::duckdb::VirtualColumnsResultRef; +use crate::exporter::ConversionCache; +use crate::filesystem::resolve_filesystem; +use crate::scan::EMPTY_COLUMN_IDX; +use crate::scan::EMPTY_COLUMN_NAME; +use crate::scan::VortexGlobalData; +use crate::scan::VortexLocalData; +use crate::scan::extract_schema_from_dtype; +use crate::scan::init_local_shared; + +/// Bind data for the scan API table function, holding a [`DataSourceRef`] instead of +/// per-file URLs. +pub struct VortexScanApiBindData { + data_source: DataSourceRef, + filter_exprs: Vec, + column_names: Vec, + column_types: Vec, +} + +impl Clone for VortexScanApiBindData { + fn clone(&self) -> Self { + Self { + data_source: self.data_source.clone(), + // filter_exprs are consumed once in `init_global`. + filter_exprs: vec![], + column_names: self.column_names.clone(), + column_types: self.column_types.clone(), + } + } +} + +impl Debug for VortexScanApiBindData { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("VortexScanApiBindData") + .field("column_names", &self.column_names) + .field("column_types", &self.column_types) + .field( + "filter_exprs", + &self + .filter_exprs + .iter() + .map(|e| e.to_string()) + .collect_vec(), + ) + .finish() + } +} + +#[derive(Debug)] +pub struct VortexScanApiTableFunction; + +/// Creates a projection expression from the table initialization input. +fn extract_projection_expr(init: &TableInitInput) -> Expression { + let projection_ids = init.projection_ids().unwrap_or(&[]); + let column_ids = init.column_ids(); + + select( + projection_ids + .iter() + .map(|p| { + let idx: usize = p.as_(); + let val: usize = column_ids[idx].as_(); + val + }) + .map(|idx| { + init.bind_data() + .column_names + .get(idx) + .vortex_expect("prune idx in column names") + }) + .map(|s| Arc::from(s.as_str())) + .collect::(), + root(), + ) +} + +/// Creates a table filter expression from the table filter set. +fn extract_table_filter_expr( + init: &TableInitInput, + column_ids: &[u64], +) -> VortexResult> { + let mut table_filter_exprs: HashSet = if let Some(filter) = init.table_filter_set() + { + filter + .into_iter() + .map(|(idx, ex)| { + let idx_u: usize = idx.as_(); + let col_idx: usize = column_ids[idx_u].as_(); + let name = init + .bind_data() + .column_names + .get(col_idx) + .vortex_expect("exists"); + try_from_table_filter( + ex, + &col(name.as_str()), + init.bind_data().data_source.dtype(), + ) + }) + .collect::>>>()? + .unwrap_or_else(HashSet::new) + } else { + HashSet::new() + }; + + table_filter_exprs.extend(init.bind_data().filter_exprs.clone()); + Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) +} + +impl TableFunction for VortexScanApiTableFunction { + type BindData = VortexScanApiBindData; + type GlobalState = VortexGlobalData; + type LocalState = VortexLocalData; + + const PROJECTION_PUSHDOWN: bool = true; + const FILTER_PUSHDOWN: bool = true; + const FILTER_PRUNE: bool = true; + + fn parameters() -> Vec { + vec![LogicalType::varchar()] + } + + fn bind( + ctx: &ClientContextRef, + input: &BindInputRef, + result: &mut BindResultRef, + ) -> VortexResult { + let glob_url_parameter = input + .get_parameter(0) + .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; + + // Parse the URL and separate the base URL (keep scheme, host, etc.) from the path. + let glob_url_str = glob_url_parameter.as_string(); + let glob_url = match Url::parse(glob_url_str.as_str()) { + Ok(url) => Ok(url), + Err(_) => Url::from_file_path(Path::new(glob_url_str.as_str())) + .map_err(|_| vortex_err!("Neither URL nor path: '{}' ", glob_url_str.as_str())), + }?; + + let mut base_url = glob_url.clone(); + base_url.set_path(""); + + let fs = resolve_filesystem(&base_url, ctx)?; + + let data_source: DataSourceRef = RUNTIME.block_on(async { + let builder = MultiFileDataSource::new(SESSION.clone()) + .with_filesystem(fs) + .with_glob(glob_url.path()); + let ds = builder.build().await?; + VortexResult::Ok(Arc::new(ds)) + })?; + + let (column_names, column_types) = extract_schema_from_dtype(data_source.dtype())?; + + for (column_name, column_type) in column_names.iter().zip(&column_types) { + result.add_result_column(column_name, column_type); + } + + Ok(VortexScanApiBindData { + data_source, + filter_exprs: vec![], + column_names, + column_types, + }) + } + + fn scan( + _client_context: &ClientContextRef, + _bind_data: &Self::BindData, + local_state: &mut Self::LocalState, + global_state: &mut Self::GlobalState, + chunk: &mut DataChunkRef, + ) -> VortexResult<()> { + global_state.scan(local_state, chunk) + } + + fn init_global(init_input: &TableInitInput) -> VortexResult { + let bind_data = init_input.bind_data(); + let projection_expr = extract_projection_expr(init_input); + let filter_expr = extract_table_filter_expr(init_input, init_input.column_ids())?; + + tracing::debug!( + "Global init Vortex scan_api SELECT {} WHERE {}", + &projection_expr, + filter_expr + .as_ref() + .map_or_else(|| "true".to_string(), |f| f.to_string()) + ); + + let request = ScanRequest { + projection: Some(projection_expr), + filter: filter_expr, + ..Default::default() + }; + + let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?; + let conversion_cache = Arc::new(ConversionCache::new(0)); + + let num_workers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + // Each split.execute() returns a lazy stream whose early polls do preparation + // work (expression resolution, layout traversal, first I/O spawns). We use + // try_flatten_unordered to poll multiple split streams concurrently so that + // the next split is already warm when the current one finishes. + let scan_streams = scan.partitions().map(move |split_result| { + let cache = conversion_cache.clone(); + let split = split_result?; + let s = split.execute()?; + VortexResult::Ok(s.map(move |r| Ok((r?, cache.clone()))).boxed()) + }); + + let iterator = RUNTIME.block_on_stream_thread_safe(move |_| { + scan_streams.try_flatten_unordered(Some(num_workers * 2)) + }); + + Ok(VortexGlobalData::new(iterator)) + } + + fn init_local( + _init: &TableInitInput, + global: &mut Self::GlobalState, + ) -> VortexResult { + init_local_shared(global) + } + + fn table_scan_progress( + _client_context: &ClientContextRef, + _bind_data: &mut Self::BindData, + global_state: &mut Self::GlobalState, + ) -> f64 { + global_state.progress() + } + + fn pushdown_complex_filter( + bind_data: &mut Self::BindData, + expr: &ExpressionRef, + ) -> VortexResult { + tracing::debug!("Attempting to push down filter expression: {expr}"); + let Some(expr) = try_from_bound_expression(expr)? else { + return Ok(false); + }; + bind_data.filter_exprs.push(expr); + + // NOTE(ngates): Vortex does indeed run exact filters, so in theory we should return `true` + // here to tell DuckDB we've handled the filter. However, DuckDB applies some crude + // cardinality estimation heuristics (e.g. an equality filter => 20% selectivity) that + // means by returning false, DuckDB runs an additional filter (a little bit of overhead) + // but tends to end up with a better query plan. + // If we plumb row count estimation into the layout tree, perhaps we could use zone maps + // etc. to return estimates. But this function is probably called too late anyway. Maybe + // we need our own cardinality heuristics. + Ok(false) + } + + fn cardinality(bind_data: &Self::BindData) -> Cardinality { + match bind_data.data_source.row_count() { + Some(Precision::Exact(v)) => Cardinality::Maximum(v), + Some(Precision::Inexact(v)) => Cardinality::Estimate(v), + None => Cardinality::Unknown, + } + } + + fn partition_data( + _bind_data: &Self::BindData, + _global_init_data: &mut Self::GlobalState, + local_init_data: &mut Self::LocalState, + ) -> VortexResult { + VortexGlobalData::partition_data(local_init_data) + } + + fn to_string(bind_data: &Self::BindData) -> Option> { + let mut result = Vec::new(); + + result.push(("Function".to_string(), "Vortex Scan (scan API)".to_string())); + + if !bind_data.filter_exprs.is_empty() { + let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{}", f)); + result.push(("Filters".to_string(), filters.join(" /\\\n"))); + } + + Some(result) + } + + fn virtual_columns(_bind_data: &Self::BindData, result: &mut VirtualColumnsResultRef) { + result.register(EMPTY_COLUMN_IDX, EMPTY_COLUMN_NAME, &LogicalType::bool()); + } +} diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index c69e21c6af1..8b7eb2aadf2 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -25,12 +25,14 @@ getrandom_v03 = { workspace = true } # Needed to pi glob = { workspace = true } itertools = { workspace = true } kanal = { workspace = true } +moka = { workspace = true, features = ["sync"] } object_store = { workspace = true, optional = true } oneshot.workspace = true parking_lot = { workspace = true } pin-project-lite = { workspace = true } tokio = { workspace = true, features = ["rt"], optional = true } tracing = { workspace = true } +url = { workspace = true } uuid = { workspace = true } # Needed to pickup the "js" feature for wasm targets from the workspace configuration vortex-alp = { workspace = true } vortex-array = { workspace = true } @@ -46,6 +48,7 @@ vortex-flatbuffers = { workspace = true, features = ["file"] } vortex-fsst = { workspace = true } vortex-io = { workspace = true } vortex-layout = { workspace = true } +vortex-mask = { workspace = true } vortex-metrics = { workspace = true } vortex-pco = { workspace = true } vortex-runend = { workspace = true } @@ -61,6 +64,7 @@ vortex-zstd = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } vortex-array = { workspace = true, features = ["_test-harness"] } vortex-io = { workspace = true, features = ["tokio"] } +vortex-layout = { workspace = true, features = ["_test-harness"] } vortex-scan = { workspace = true } [lints] diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index d0cc2ef5111..599898a7007 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -1,5 +1,21 @@ pub mod vortex_file +pub mod vortex_file::multi + +pub struct vortex_file::multi::MultiFileDataSource + +impl vortex_file::multi::MultiFileDataSource + +pub async fn vortex_file::multi::MultiFileDataSource::build(self) -> vortex_error::VortexResult + +pub fn vortex_file::multi::MultiFileDataSource::new(session: vortex_session::VortexSession) -> Self + +pub fn vortex_file::multi::MultiFileDataSource::with_filesystem(self, fs: vortex_io::filesystem::FileSystemRef) -> Self + +pub fn vortex_file::multi::MultiFileDataSource::with_glob(self, glob: impl core::convert::Into) -> Self + +pub fn vortex_file::multi::MultiFileDataSource::with_open_options(self, f: impl core::ops::function::Fn(vortex_file::VortexOpenOptions) -> vortex_file::VortexOpenOptions + core::marker::Send + core::marker::Sync + 'static) -> Self + pub mod vortex_file::segments pub enum vortex_file::segments::ReadEvent @@ -48,6 +64,30 @@ impl vortex_file::segments::RequestMetrics pub fn vortex_file::segments::RequestMetrics::new(metrics_registry: &dyn vortex_metrics::MetricsRegistry, labels: alloc::vec::Vec) -> Self +pub mod vortex_file::v2 + +pub struct vortex_file::v2::FileStatsLayoutReader + +impl vortex_file::v2::FileStatsLayoutReader + +pub fn vortex_file::v2::FileStatsLayoutReader::new(child: vortex_layout::reader::LayoutReaderRef, file_stats: vortex_file::FileStatistics, session: vortex_session::VortexSession) -> Self + +impl vortex_layout::reader::LayoutReader for vortex_file::v2::FileStatsLayoutReader + +pub fn vortex_file::v2::FileStatsLayoutReader::dtype(&self) -> &vortex_array::dtype::DType + +pub fn vortex_file::v2::FileStatsLayoutReader::filter_evaluation(&self, row_range: &core::ops::range::Range, expr: &vortex_array::expr::expression::Expression, mask: vortex_array::mask_future::MaskFuture) -> vortex_error::VortexResult + +pub fn vortex_file::v2::FileStatsLayoutReader::name(&self) -> &alloc::sync::Arc + +pub fn vortex_file::v2::FileStatsLayoutReader::projection_evaluation(&self, row_range: &core::ops::range::Range, expr: &vortex_array::expr::expression::Expression, mask: vortex_array::mask_future::MaskFuture) -> vortex_error::VortexResult + +pub fn vortex_file::v2::FileStatsLayoutReader::pruning_evaluation(&self, row_range: &core::ops::range::Range, expr: &vortex_array::expr::expression::Expression, mask: vortex_mask::Mask) -> vortex_error::VortexResult + +pub fn vortex_file::v2::FileStatsLayoutReader::register_splits(&self, field_mask: &[vortex_array::dtype::field_mask::FieldMask], row_range: &core::ops::range::Range, splits: &mut alloc::collections::btree::set::BTreeSet) -> vortex_error::VortexResult<()> + +pub fn vortex_file::v2::FileStatsLayoutReader::row_count(&self) -> u64 + pub enum vortex_file::DeserializeStep pub vortex_file::DeserializeStep::Done(vortex_file::Footer) @@ -210,6 +250,8 @@ impl vortex_file::VortexFile pub fn vortex_file::VortexFile::can_prune(&self, filter: &vortex_array::expr::expression::Expression) -> vortex_error::VortexResult +pub fn vortex_file::VortexFile::data_source(&self) -> vortex_error::VortexResult + pub fn vortex_file::VortexFile::dtype(&self) -> &vortex_array::dtype::DType pub fn vortex_file::VortexFile::file_stats(&self) -> core::option::Option<&vortex_file::FileStatistics> diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index 43d28f985d8..d113cc8cf98 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -25,12 +25,15 @@ use vortex_layout::LayoutReader; use vortex_layout::segments::SegmentSource; use vortex_scan::ScanBuilder; use vortex_scan::SplitBy; +use vortex_scan::api::DataSourceRef; +use vortex_scan::layout::LayoutReaderDataSource; use vortex_session::VortexSession; use vortex_utils::aliases::hash_map::HashMap; use crate::FileStatistics; use crate::footer::Footer; use crate::pruning::extract_relevant_file_stats_as_struct_row; +use crate::v2::FileStatsLayoutReader; /// Represents a Vortex file, providing access to its metadata and content. /// @@ -87,6 +90,25 @@ impl VortexFile { .new_reader("".into(), segment_source, &self.session) } + /// Create a [`DataSource`](vortex_scan::api::DataSource) from this file for scanning. + /// + /// Wraps the file's layout reader with [`FileStatsLayoutReader`] (when file-level + /// statistics are available) and [`LayoutReaderDataSource`]. + pub fn data_source(&self) -> VortexResult { + let mut reader = self.layout_reader()?; + if let Some(stats) = self.file_stats().cloned() { + reader = Arc::new(FileStatsLayoutReader::new( + reader, + stats, + self.session.clone(), + )); + } + Ok(Arc::new(LayoutReaderDataSource::new( + reader, + self.session.clone(), + ))) + } + /// Initiate a scan of the file, returning a builder for configuring the scan. pub fn scan(&self) -> VortexResult> { Ok(ScanBuilder::new( diff --git a/vortex-file/src/footer/mod.rs b/vortex-file/src/footer/mod.rs index 8cd54ed5d36..4142a60ded9 100644 --- a/vortex-file/src/footer/mod.rs +++ b/vortex-file/src/footer/mod.rs @@ -68,6 +68,11 @@ impl Footer { } } + pub(crate) fn with_approx_byte_size(mut self, approx_byte_size: usize) -> Self { + self.approx_byte_size = Some(approx_byte_size); + self + } + /// Read the [`Footer`] from a flatbuffer. pub(crate) fn from_flatbuffer( footer_bytes: FlatBuffer, diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index c529756dcee..055c4b53890 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -93,6 +93,7 @@ mod counting; mod file; mod footer; +pub mod multi; mod open; mod pruning; mod read; @@ -100,6 +101,7 @@ pub mod segments; mod strategy; #[cfg(test)] mod tests; +pub mod v2; mod writer; pub use file::*; diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs new file mode 100644 index 00000000000..19cbc6cae9a --- /dev/null +++ b/vortex-file/src/multi/mod.rs @@ -0,0 +1,227 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Builder for constructing a [`MultiDataSource`] from multiple Vortex files. + +mod session; + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::TryStreamExt; +use session::MultiFileSessionExt; +use tracing::debug; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_io::filesystem::FileListing; +use vortex_io::filesystem::FileSystemRef; +use vortex_scan::api::DataSource; +use vortex_scan::api::DataSourceRef; +use vortex_scan::multi::DataSourceFactory; +use vortex_scan::multi::MultiDataSource; +use vortex_session::VortexSession; + +use crate::OpenOptionsSessionExt; +use crate::VortexOpenOptions; + +/// A builder that discovers multiple Vortex files from a glob pattern and constructs a +/// [`MultiDataSource`] to scan them as a single data source. +/// +/// The primary interface is [`Self::with_glob`], which accepts a glob +/// pattern (optionally prefixed with `file://`). For non-local filesystems (S3, GCS, etc.), +/// callers must also provide a [`FileSystemRef`] via [`Self::with_filesystem`]). +/// +/// # Examples +/// +/// ```ignore +/// // Local files — filesystem is auto-created: +/// let ds = MultiFileDataSource::new(session) +/// .with_glob("/data/warehouse/*.vortex") +/// .build() +/// .await?; +/// +/// // S3 — caller provides the filesystem: +/// let ds = MultiFileDataSource::new(session) +/// .with_filesystem(s3_fs) +/// .with_glob("prefix/*.vortex") +/// .build() +/// .await?; +/// ``` +pub struct MultiFileDataSource { + session: VortexSession, + fs: Option, + glob: Option, + open_options_fn: Arc VortexOpenOptions + Send + Sync>, +} + +impl MultiFileDataSource { + /// Create a new [`MultiFileDataSource`] builder. + pub fn new(session: VortexSession) -> Self { + Self { + session, + fs: None, + glob: None, + open_options_fn: Arc::new(|opts| opts), + } + } + + /// Set the path glob for file discovery. + /// + /// This path should be relative to the filesystem's base URL. + pub fn with_glob(mut self, glob: impl Into) -> Self { + self.glob = Some(glob.into().trim_start_matches("/").to_string()); + self + } + + /// Set the filesystem to use for file discovery and reading. + /// + /// Required for non-local URLs (S3, GCS, etc.). For `file://` or bare path URLs, + /// a local filesystem is created automatically if none is provided. + pub fn with_filesystem(mut self, fs: FileSystemRef) -> Self { + self.fs = Some(fs); + self + } + + /// Customize [`VortexOpenOptions`] applied to each file. + /// + /// Use this to configure segment caches, metrics registries, or other per-file options. + pub fn with_open_options( + mut self, + f: impl Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync + 'static, + ) -> Self { + self.open_options_fn = Arc::new(f); + self + } + + /// Build the [`MultiDataSource`]. + /// + /// Discovers files via glob, opens the first file eagerly to determine the schema, + /// and creates lazy factories for the remaining files. + pub async fn build(mut self) -> VortexResult { + let glob = self + .glob + .take() + .ok_or_else(|| vortex_err!("MultiFileDataSource requires a glob URL"))?; + + let fs = match self.fs { + Some(fs) => fs, + None => create_local_filesystem(&self.session)?, + }; + let files: Vec = fs.glob(&glob)?.try_collect().await?; + + if files.is_empty() { + vortex_bail!("No files matched the glob pattern '{}'", glob); + } + + let file_count = files.len(); + debug!(file_count, glob = %glob, "discovered files"); + + // Open first file eagerly for dtype. + let first_file = + open_file(&fs, &files[0], &self.session, self.open_options_fn.as_ref()).await?; + let first_ds = first_file.data_source()?; + + let factories: Vec> = files[1..] + .iter() + .map(|f| { + Arc::new(VortexFileFactory { + fs: fs.clone(), + file: f.clone(), + session: self.session.clone(), + open_options_fn: self.open_options_fn.clone(), + }) as Arc + }) + .collect(); + + let inner = MultiDataSource::lazy(first_ds, factories, &self.session); + + debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource"); + + Ok(inner) + } +} + +/// Creates a local filesystem backed by `object_store::local::LocalFileSystem`. +// TODO(ngates): create a native file system without an object_store dependency. +// Turns out it's not a trivial change because we have always used object_store with its own +// coalescing and concurrency configs, so we need to re-tune for local disk. +#[cfg(feature = "object_store")] +fn create_local_filesystem(session: &VortexSession) -> VortexResult { + use vortex_io::object_store::ObjectStoreFileSystem; + use vortex_io::session::RuntimeSessionExt; + + let store = Arc::new(object_store::local::LocalFileSystem::default()); + let fs: FileSystemRef = Arc::new(ObjectStoreFileSystem::new(store, session.handle())); + Ok(fs) +} + +#[cfg(not(feature = "object_store"))] +fn create_local_filesystem(_session: &VortexSession) -> VortexResult { + vortex_bail!( + "The 'object_store' feature is required for automatic local filesystem creation. \ + Either enable the feature or provide a filesystem via .with_filesystem()." + ); +} + +/// Open a single Vortex file, checking the session's footer cache. +async fn open_file( + fs: &FileSystemRef, + file: &FileListing, + session: &VortexSession, + open_options_fn: &(dyn Fn(VortexOpenOptions) -> VortexOpenOptions + Send + Sync), +) -> VortexResult { + debug!(path = %file.path, "opening vortex file"); + + // Open the reader first so we can use its URI as the cache key. + // The URI includes the full path (with any filesystem prefix), making it unique + // even when different PrefixFileSystem instances strip paths to the same relative name. + let source = fs.open_read(&file.path).await?; + let cache_key = source + .uri() + .map(|u| u.to_string()) + .unwrap_or_else(|| file.path.clone()); + + // Build open options. The DashMap Ref from multi_file() must not live across an await, + // so we scope the cache lookup in a block. + let options = { + let mut options = open_options_fn(session.open_options()); + if let Some(size) = file.size { + options = options.with_file_size(size); + } + if let Some(footer) = session.multi_file().get_footer(&cache_key) { + options = options.with_footer(footer); + } + options + }; + + let vortex_file = options.open(source).await?; + + // Store footer in cache (scoped to avoid holding the Ref across subsequent code). + session + .multi_file() + .put_footer(&cache_key, vortex_file.footer().clone()); + Ok(vortex_file) +} + +/// A [`DataSourceFactory`] that lazily opens a single Vortex file. +struct VortexFileFactory { + fs: FileSystemRef, + file: FileListing, + session: VortexSession, + open_options_fn: Arc VortexOpenOptions + Send + Sync>, +} + +#[async_trait] +impl DataSourceFactory for VortexFileFactory { + async fn open(&self) -> VortexResult> { + let file = open_file( + &self.fs, + &self.file, + &self.session, + self.open_options_fn.as_ref(), + ) + .await?; + Ok(Some(file.data_source()?)) + } +} diff --git a/vortex-file/src/multi/session.rs b/vortex-file/src/multi/session.rs new file mode 100644 index 00000000000..d7705006d37 --- /dev/null +++ b/vortex-file/src/multi/session.rs @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Session extension for multi-file scanning, providing a shared footer cache. + +use std::fmt; +use std::fmt::Debug; + +use vortex_session::SessionExt; + +use crate::footer::Footer; + +/// Session state for multi-file scanning. +/// +/// Provides a shared, in-memory footer cache so that repeated scans over the same files +/// avoid redundant footer I/O. The cache is bounded by entry count and lives as long as +/// the [`VortexSession`](vortex_session::VortexSession). +/// +/// # Future Work +/// +/// Consider generalizing this cache into [`VortexOpenOptions`](crate::VortexOpenOptions) so +/// that single-file opens also benefit from session-level footer caching. +pub(super) struct MultiFileSession { + footer_cache: moka::sync::Cache, +} + +impl Default for MultiFileSession { + fn default() -> Self { + Self { + footer_cache: moka::sync::Cache::builder() + // Capacity and weigher are in KB + .max_capacity(100 * 1024) // 100MB + .weigher(|_k, footer: &Footer| { + footer + .approx_byte_size() + .and_then(|bytes| u32::try_from(bytes / 1024).ok()) + .unwrap_or(10) + }) + .build(), + } + } +} + +impl Debug for MultiFileSession { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MultiFileSession") + .field("footer_cache_entry_count", &self.footer_cache.entry_count()) + .finish() + } +} + +impl MultiFileSession { + /// Retrieve a cached footer for the given file path. + pub fn get_footer(&self, path: &str) -> Option