diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 9f4d2a33df0..2dda2bf2108 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -251,6 +251,11 @@ name = "arrow_reader_clickbench" required-features = ["arrow", "async", "object_store"] harness = false +[[bench]] +name = "cached_array_reader" +required-features = ["arrow", "experimental"] +harness = false + [[bench]] name = "compression" required-features = ["experimental", "default"] diff --git a/parquet/benches/cached_array_reader.rs b/parquet/benches/cached_array_reader.rs new file mode 100644 index 00000000000..6977200b89c --- /dev/null +++ b/parquet/benches/cached_array_reader.rs @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{Criterion, criterion_group, criterion_main}; +use parquet::arrow::array_reader::{ArrayReader, CacheRole, CachedArrayReader, RowGroupCache}; +use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use parquet::errors::Result; + +use arrow_array::ArrayRef; +use arrow_array::builder::StringBuilder; +use arrow_schema::DataType as ArrowType; +use rand::{Rng, SeedableRng, rngs::StdRng}; +use std::any::Any; +use std::hint::black_box; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + +const TOTAL_ROWS: usize = 4_194_304; +const BATCH_SIZE: usize = 1_024; +const ESTIMATED_AVG_BYTES_PER_ROW_NUM: usize = 67; +const ESTIMATED_AVG_BYTES_PER_ROW_DEN: usize = 8; + +#[derive(Clone, Copy)] +enum SelectionOp { + Read(usize), + Skip(usize), +} + +struct MockArrayRefReader { + data: ArrayRef, + position: usize, + records_to_consume: usize, + data_type: ArrowType, +} + +impl MockArrayRefReader { + fn new(data: ArrayRef) -> Self { + Self { + data_type: data.data_type().clone(), + data, + position: 0, + records_to_consume: 0, + } + } +} + +impl ArrayReader for MockArrayRefReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_read = std::cmp::min(batch_size, remaining); + self.records_to_consume += to_read; + Ok(to_read) + } + + fn consume_batch(&mut self) -> Result { + let start = self.position; + let end = start + self.records_to_consume; + self.position = end; + self.records_to_consume = 0; + Ok(self.data.slice(start, end - start)) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_skip = std::cmp::min(num_records, remaining); + self.position += to_skip; + Ok(to_skip) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} + +struct BenchCase { + data: ArrayRef, + selection_ops: Vec, + selected_rows: usize, +} + +impl BenchCase { + fn new() -> Self { + let data = make_string_array(TOTAL_ROWS); + let (selection_ops, selected_rows) = make_selection_ops(TOTAL_ROWS); + + Self { + data, + selection_ops, + selected_rows, + } + } + + fn prepare_reader(&self) -> CachedArrayReader { + let metrics = ArrowReaderMetrics::disabled(); + let cache = Arc::new(RwLock::new(RowGroupCache::new(BATCH_SIZE, usize::MAX))); + let mut reader = CachedArrayReader::new( + Box::new(MockArrayRefReader::new(self.data.clone())), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + for op in &self.selection_ops { + match op { + SelectionOp::Read(count) => { + assert_eq!(reader.read_records(*count).unwrap(), *count); + } + SelectionOp::Skip(count) => { + assert_eq!(reader.skip_records(*count).unwrap(), *count); + } + } + } + + reader + } +} + +fn make_string_array(total_rows: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(44); + let value_capacity = + total_rows * ESTIMATED_AVG_BYTES_PER_ROW_NUM / ESTIMATED_AVG_BYTES_PER_ROW_DEN; + let mut builder = StringBuilder::with_capacity(total_rows, value_capacity); + + for _ in 0..total_rows { + let value = if rng.random_bool(0.5) { + let len = if rng.random_bool(0.5) { + rng.random_range(13..21) + } else { + rng.random_range(3..12) + }; + + (0..len) + .map(|_| (b'a' + rng.random_range(0..26)) as char) + .collect() + } else { + "const".to_string() + }; + builder.append_value(value); + } + + Arc::new(builder.finish()) +} + +fn make_selection_ops(total_rows: usize) -> (Vec, usize) { + let mut rng = StdRng::seed_from_u64(9060); + let mut remaining = total_rows; + let mut selected_rows = 0; + let mut ops = Vec::new(); + + while remaining > 0 { + // Match the issue more closely: small selected runs and much longer gaps. + let read = std::cmp::min(rng.random_range(4..11), remaining); + ops.push(SelectionOp::Read(read)); + selected_rows += read; + remaining -= read; + + if remaining == 0 { + break; + } + + let skip = std::cmp::min(rng.random_range(35..66), remaining); + ops.push(SelectionOp::Skip(skip)); + remaining -= skip; + } + + (ops, selected_rows) +} + +fn cached_array_reader_benchmark(c: &mut Criterion) { + let case = BenchCase::new(); + + let array = case.prepare_reader().consume_batch().unwrap(); + assert_eq!(array.len(), case.selected_rows); + + let mut group = c.benchmark_group("cached_array_reader"); + group.sample_size(30); + group.measurement_time(Duration::from_secs(4)); + group.bench_function("utf8_sparse_cross_batch_4m_rows/consume_batch", |b| { + b.iter_custom(|iters| { + let mut total = Duration::ZERO; + for _ in 0..iters { + let mut reader = case.prepare_reader(); + let start = Instant::now(); + let array = reader.consume_batch().unwrap(); + black_box(&array); + total += start.elapsed(); + } + total + }) + }); + group.finish(); +} + +criterion_group!(benches, cached_array_reader_benchmark); +criterion_main!(benches); diff --git a/parquet/src/arrow/array_reader/cached_array_reader.rs b/parquet/src/arrow/array_reader/cached_array_reader.rs index 21f0c2afa41..404069e6791 100644 --- a/parquet/src/arrow/array_reader/cached_array_reader.rs +++ b/parquet/src/arrow/array_reader/cached_array_reader.rs @@ -21,9 +21,17 @@ use crate::arrow::array_reader::row_group_cache::BatchID; use crate::arrow::array_reader::{ArrayReader, row_group_cache::RowGroupCache}; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::errors::Result; -use arrow_array::{ArrayRef, BooleanArray, new_empty_array}; -use arrow_buffer::BooleanBufferBuilder; +use arrow_array::Array; +use arrow_array::cast::AsArray; +use arrow_array::types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, Utf8Type}; +use arrow_array::{ArrayRef, BooleanArray, make_array, new_empty_array}; +use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder}; +use arrow_data::{ + ArrayDataBuilder, + transform::{Capacities, MutableArrayData}, +}; use arrow_schema::DataType as ArrowType; +use arrow_select::filter::SlicesIterator; use std::any::Any; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -89,6 +97,12 @@ pub struct CachedArrayReader { metrics: ArrowReaderMetrics, } +struct SelectedBatch { + array: ArrayRef, + mask: BooleanBuffer, + selected_rows: usize, +} + impl CachedArrayReader { /// Creates a new cached array reader with the specified role pub fn new( @@ -183,6 +197,207 @@ impl CachedArrayReader { } } } + + fn coalesce_selected_batches( + selected_batches: Vec, + selected_row_count: usize, + ) -> ArrayRef { + match selected_batches[0].array.data_type() { + ArrowType::Utf8 => { + return Self::coalesce_selected_byte_batches::( + &selected_batches, + selected_row_count, + ); + } + ArrowType::LargeUtf8 => { + return Self::coalesce_selected_byte_batches::( + &selected_batches, + selected_row_count, + ); + } + ArrowType::Binary => { + return Self::coalesce_selected_byte_batches::( + &selected_batches, + selected_row_count, + ); + } + ArrowType::LargeBinary => { + return Self::coalesce_selected_byte_batches::( + &selected_batches, + selected_row_count, + ); + } + _ => {} + } + + // Copy directly from cached batch slices so we don't materialize + // filtered arrays only to concatenate them again. + let capacities = Self::coalesce_capacities(&selected_batches, selected_row_count); + let source_data = selected_batches + .iter() + .map(|selected_batch| selected_batch.array.to_data()) + .collect::>(); + let mut mutable = + MutableArrayData::with_capacities(source_data.iter().collect(), false, capacities); + + for (source_index, selected_batch) in selected_batches.iter().enumerate() { + if selected_batch.selected_rows == selected_batch.array.len() { + mutable.extend(source_index, 0, selected_batch.array.len()); + continue; + } + + for (start, end) in SlicesIterator::from(&selected_batch.mask) { + mutable.extend(source_index, start, end); + } + } + + make_array(mutable.freeze()) + } + + fn coalesce_selected_byte_batches( + selected_batches: &[SelectedBatch], + selected_row_count: usize, + ) -> ArrayRef { + let mut cur_offset = T::Offset::from_usize(0).unwrap(); + let mut dst_offsets = Vec::with_capacity(selected_row_count + 1); + dst_offsets.push(cur_offset); + + let mut validity = selected_batches + .iter() + .any(|selected_batch| selected_batch.array.null_count() > 0) + .then(|| BooleanBufferBuilder::new(selected_row_count)); + + for selected_batch in selected_batches { + let array = selected_batch.array.as_ref().as_bytes::(); + let offsets = array.value_offsets(); + + if selected_batch.selected_rows == selected_batch.array.len() { + for idx in 0..array.len() { + cur_offset += offsets[idx + 1] - offsets[idx]; + dst_offsets.push(cur_offset); + } + + if let Some(validity) = validity.as_mut() { + Self::append_selected_validity(validity, array.nulls(), 0, array.len()); + } + continue; + } + + for (start, end) in SlicesIterator::from(&selected_batch.mask) { + for idx in start..end { + cur_offset += offsets[idx + 1] - offsets[idx]; + dst_offsets.push(cur_offset); + } + + if let Some(validity) = validity.as_mut() { + Self::append_selected_validity(validity, array.nulls(), start, end - start); + } + } + } + + let mut dst_values = Vec::with_capacity(cur_offset.as_usize()); + for selected_batch in selected_batches { + let array = selected_batch.array.as_ref().as_bytes::(); + let offsets = array.value_offsets(); + let values = array.value_data(); + + if selected_batch.selected_rows == selected_batch.array.len() { + dst_values.extend_from_slice( + &values[offsets[0].as_usize()..offsets[array.len()].as_usize()], + ); + continue; + } + + for (start, end) in SlicesIterator::from(&selected_batch.mask) { + dst_values + .extend_from_slice(&values[offsets[start].as_usize()..offsets[end].as_usize()]); + } + } + + let mut builder = ArrayDataBuilder::new(T::DATA_TYPE) + .len(selected_row_count) + .add_buffer(dst_offsets.into()) + .add_buffer(dst_values.into()); + + if let Some(mut validity) = validity { + let validity = validity.finish(); + let null_count = selected_row_count - validity.count_set_bits(); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(Some(validity.into_inner())); + } + } + + make_array(unsafe { builder.build_unchecked() }) + } + + fn append_selected_validity( + validity: &mut BooleanBufferBuilder, + nulls: Option<&arrow_buffer::NullBuffer>, + start: usize, + len: usize, + ) { + match nulls { + Some(nulls) if nulls.null_count() > 0 => { + let nulls = nulls.inner(); + validity.append_packed_range( + start + nulls.offset()..start + nulls.offset() + len, + nulls.values(), + ); + } + _ => validity.append_n(len, true), + } + } + + fn coalesce_capacities( + selected_batches: &[SelectedBatch], + selected_row_count: usize, + ) -> Capacities { + match selected_batches[0].array.data_type() { + ArrowType::Utf8 => Capacities::Binary( + selected_row_count, + Some(Self::selected_bytes_capacity::(selected_batches)), + ), + ArrowType::LargeUtf8 => Capacities::Binary( + selected_row_count, + Some(Self::selected_bytes_capacity::( + selected_batches, + )), + ), + ArrowType::Binary => Capacities::Binary( + selected_row_count, + Some(Self::selected_bytes_capacity::( + selected_batches, + )), + ), + ArrowType::LargeBinary => Capacities::Binary( + selected_row_count, + Some(Self::selected_bytes_capacity::( + selected_batches, + )), + ), + _ => Capacities::Array(selected_row_count), + } + } + + fn selected_bytes_capacity(selected_batches: &[SelectedBatch]) -> usize { + selected_batches + .iter() + .map(|selected_batch| { + let array = selected_batch.array.as_ref().as_bytes::(); + let offsets = array.value_offsets(); + + if selected_batch.selected_rows == selected_batch.array.len() { + offsets[offsets.len() - 1].as_usize() - offsets[0].as_usize() + } else { + SlicesIterator::from(&selected_batch.mask) + .map(|(start, end)| offsets[end].as_usize() - offsets[start].as_usize()) + .sum::() + } + }) + .sum() + } } impl ArrayReader for CachedArrayReader { @@ -280,7 +495,8 @@ impl ArrayReader for CachedArrayReader { let start_batch = start_position / self.batch_size; let end_batch = (start_position + row_count - 1) / self.batch_size; - let mut selected_arrays = Vec::new(); + let selected_row_count = selection_buffer.count_set_bits(); + let mut selected_batches = Vec::new(); for batch_id in start_batch..=end_batch { let batch_start = batch_id * self.batch_size; let batch_end = batch_start + self.batch_size - 1; @@ -298,19 +514,22 @@ impl ArrayReader for CachedArrayReader { let selection_length = overlap_end - overlap_start + 1; let mask = selection_buffer.slice(selection_start, selection_length); - if mask.count_set_bits() == 0 { + let selected_rows = mask.count_set_bits(); + if selected_rows == 0 { continue; } - let mask_array = BooleanArray::from(mask); // Read from local cache instead of shared cache to avoid cache eviction issues let cached = self .local_cache .get(&batch_id) .expect("data must be already cached in the read_records call, this is a bug"); let cached = cached.slice(overlap_start - batch_start, selection_length); - let filtered = arrow_select::filter::filter(&cached, &mask_array)?; - selected_arrays.push(filtered); + selected_batches.push(SelectedBatch { + array: cached, + mask, + selected_rows, + }); } self.selections = BooleanBufferBuilder::new(0); @@ -327,15 +546,26 @@ impl ArrayReader for CachedArrayReader { self.cleanup_consumed_batches(); } - match selected_arrays.len() { + match selected_batches.len() { 0 => Ok(new_empty_array(self.inner.get_data_type())), - 1 => Ok(selected_arrays.into_iter().next().unwrap()), - _ => Ok(arrow_select::concat::concat( - &selected_arrays - .iter() - .map(|a| a.as_ref()) - .collect::>(), - )?), + 1 => { + let SelectedBatch { + array, + mask, + selected_rows, + } = selected_batches.into_iter().next().unwrap(); + + if selected_rows == array.len() { + Ok(array) + } else { + let mask = BooleanArray::from(mask); + Ok(arrow_select::filter::filter(array.as_ref(), &mask)?) + } + } + _ => Ok(Self::coalesce_selected_batches( + selected_batches, + selected_row_count, + )), } } @@ -353,7 +583,7 @@ mod tests { use super::*; use crate::arrow::array_reader::ArrayReader; use crate::arrow::array_reader::row_group_cache::RowGroupCache; - use arrow_array::{ArrayRef, Int32Array}; + use arrow_array::{ArrayRef, Int32Array, StringArray}; use std::sync::{Arc, RwLock}; // Mock ArrayReader for testing @@ -416,6 +646,64 @@ mod tests { } } + struct MockArrayRefReader { + data: ArrayRef, + position: usize, + records_to_consume: usize, + data_type: ArrowType, + } + + impl MockArrayRefReader { + fn new(data: ArrayRef) -> Self { + Self { + data_type: data.data_type().clone(), + data, + position: 0, + records_to_consume: 0, + } + } + } + + impl ArrayReader for MockArrayRefReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_read = std::cmp::min(batch_size, remaining); + self.records_to_consume += to_read; + Ok(to_read) + } + + fn consume_batch(&mut self) -> Result { + let start = self.position; + let end = start + self.records_to_consume; + self.position = end; + self.records_to_consume = 0; + Ok(self.data.slice(start, end - start)) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + let remaining = self.data.len() - self.position; + let to_skip = std::cmp::min(num_records, remaining); + self.position += to_skip; + Ok(to_skip) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } + } + #[test] fn test_cached_reader_basic() { let metrics = ArrowReaderMetrics::disabled(); @@ -759,4 +1047,30 @@ mod tests { let int32_array = array.as_any().downcast_ref::().unwrap(); assert_eq!(int32_array.values(), &[1, 2, 3, 4]); } + + #[test] + fn test_sparse_string_selection_across_cached_batches() { + let metrics = ArrowReaderMetrics::disabled(); + let cache = Arc::new(RwLock::new(RowGroupCache::new(3, usize::MAX))); // Batch size 3 + let data: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e", "f"])); + let mut cached_reader = CachedArrayReader::new( + Box::new(MockArrayRefReader::new(data)), + cache, + 0, + CacheRole::Consumer, + metrics, + ); + + assert_eq!(cached_reader.read_records(2).unwrap(), 2); + assert_eq!(cached_reader.skip_records(2).unwrap(), 2); + assert_eq!(cached_reader.read_records(2).unwrap(), 2); + + let array = cached_reader.consume_batch().unwrap(); + assert_eq!(array.len(), 4); + let string_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(string_array.value(0), "a"); + assert_eq!(string_array.value(1), "b"); + assert_eq!(string_array.value(2), "e"); + assert_eq!(string_array.value(3), "f"); + } } diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 726eae1f51c..0016b141566 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -59,6 +59,8 @@ pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks pub use byte_view_array::make_byte_view_array_reader; #[allow(unused_imports)] // Only used for benchmarks +pub use cached_array_reader::{CacheRole, CachedArrayReader}; +#[allow(unused_imports)] // Only used for benchmarks pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; pub use fixed_size_list_array::FixedSizeListArrayReader; pub use list_array::ListArrayReader;