-
Notifications
You must be signed in to change notification settings - Fork 135
Use Scan API #6391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Use Scan API #6391
Changes from all commits
470fbe9
f965615
ae39e1a
f41ccff
23510e4
e925792
f8a7543
d05997f
5f6123a
c5e60a4
2d6c816
5a7e047
07cb0d3
b19d03b
899675e
811acd5
bbbaa62
717ebab
0a9b542
25cac35
367baaa
3b7d0ae
aa46b99
ac06b32
270b242
6aa5a0b
7aaf8b6
669cedb
392f263
a45bf71
4aff18a
fa7a431
fe6fbec
ccc13c3
07562d6
7737131
b6e1142
35eeec7
1da2a28
c05641b
5d59027
d2aa4fb
e1330dd
d396f02
9e54416
8b2ca17
68ec238
d7ea77b
6f030fe
dfb275c
f87b42a
34cff37
978ec48
f28bd1a
9f671fe
5db5de4
92b5910
ac962ac
a8384a5
61d4b6a
1f849ce
9c82df8
fa37371
e658d6c
826216d
2acb89f
847075f
0e639e2
d24bd64
04da834
d856d3d
00ba860
e9dd877
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,13 +26,15 @@ use datafusion_physical_plan::collect; | |
| use futures::StreamExt; | ||
| use parking_lot::Mutex; | ||
| use tokio::fs::File; | ||
| use vortex::scan::api::DataSourceRef; | ||
| use vortex_bench::Benchmark; | ||
| use vortex_bench::BenchmarkArg; | ||
| use vortex_bench::CompactionStrategy; | ||
| use vortex_bench::Engine; | ||
| use vortex_bench::Format; | ||
| use vortex_bench::Opt; | ||
| use vortex_bench::Opts; | ||
| use vortex_bench::SESSION; | ||
| use vortex_bench::conversions::convert_parquet_directory_to_vortex; | ||
| use vortex_bench::create_benchmark; | ||
| use vortex_bench::create_output_writer; | ||
|
|
@@ -220,13 +222,20 @@ async fn main() -> anyhow::Result<()> { | |
| Ok(()) | ||
| } | ||
|
|
||
| fn use_scan_api() -> bool { | ||
| std::env::var("VORTEX_USE_SCAN_API").is_ok_and(|v| v == "1") | ||
| } | ||
|
|
||
| async fn register_benchmark_tables<B: Benchmark + ?Sized>( | ||
| session: &SessionContext, | ||
| benchmark: &B, | ||
| format: Format, | ||
| ) -> anyhow::Result<()> { | ||
| match format { | ||
| Format::Arrow => register_arrow_tables(session, benchmark).await, | ||
| _ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => { | ||
| register_v2_tables(session, benchmark, format).await | ||
| } | ||
| _ => { | ||
| let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; | ||
| let file_format = format_to_df_format(format); | ||
|
|
@@ -265,6 +274,54 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>( | |
| } | ||
| } | ||
|
|
||
| /// Register tables using the V2 `VortexTable` + `MultiFileDataSource` path. | ||
| async fn register_v2_tables<B: Benchmark + ?Sized>( | ||
| session: &SessionContext, | ||
| benchmark: &B, | ||
| format: Format, | ||
| ) -> anyhow::Result<()> { | ||
| use vortex::file::multi::MultiFileDataSource; | ||
| use vortex::io::object_store::ObjectStoreFileSystem; | ||
| use vortex::io::session::RuntimeSessionExt; | ||
| use vortex::scan::api::DataSource as _; | ||
| use vortex_datafusion::v2::VortexTable; | ||
|
Comment on lines
+283
to
+287
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move to top? also - lets log here so its visible which API is used |
||
|
|
||
| let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; | ||
|
|
||
| for table in benchmark.table_specs().iter() { | ||
| let pattern = benchmark.pattern(table.name, format); | ||
| let table_url = ListingTableUrl::try_new(benchmark_base.clone(), pattern.clone())?; | ||
| let store = session | ||
| .state() | ||
| .runtime_env() | ||
| .object_store(table_url.object_store())?; | ||
|
|
||
| let fs: vortex::io::filesystem::FileSystemRef = | ||
| Arc::new(ObjectStoreFileSystem::new(store.clone(), SESSION.handle())); | ||
| let base_prefix = benchmark_base.path().trim_start_matches('/').to_string(); | ||
| let fs = fs.with_prefix(base_prefix); | ||
|
|
||
| let glob_pattern = match &pattern { | ||
| Some(p) => p.as_str().to_string(), | ||
| None => format!("*.{}", format.ext()), | ||
| }; | ||
|
|
||
| let multi_ds = MultiFileDataSource::new(SESSION.clone()) | ||
| .with_filesystem(fs) | ||
| .with_glob(glob_pattern) | ||
| .build() | ||
| .await?; | ||
|
|
||
| let arrow_schema = Arc::new(multi_ds.dtype().to_arrow_schema()?); | ||
| let data_source: DataSourceRef = Arc::new(multi_ds); | ||
|
|
||
| let table_provider = Arc::new(VortexTable::new(data_source, SESSION.clone(), arrow_schema)); | ||
| session.register_table(table.name, table_provider)?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Load Arrow IPC files into in-memory DataFusion tables. | ||
| async fn register_arrow_tables<B: Benchmark + ?Sized>( | ||
| session: &SessionContext, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,8 @@ use crate::scalar::ScalarValue; | |
| /// This is statistic specific, for max this will be an upper bound. Meaning that the actual max | ||
| /// in an array is guaranteed to be less than or equal to the inexact value, but equal to the exact | ||
| /// value. | ||
| /// | ||
| // TODO(ngates): should we model Unknown as a variant of Precision? Or have Option<Precision<T>>? | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| #[derive(Debug, PartialEq, Eq, Clone, Copy)] | ||
| pub enum Precision<T> { | ||
| Exact(T), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.