Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 105 additions & 38 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::schema::types::ColumnDescPtr;
use crate::util::utf8::check_valid_utf8;
use arrow_array::{ArrayRef, builder::make_view};
use arrow_buffer::Buffer;
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
Expand Down Expand Up @@ -450,6 +449,35 @@ impl ByteViewArrayDecoderPlain {
}
}

/// Rewrite `view`'s buffer index by `base` when the dictionary buffers were
/// appended later than position 0 in the output buffer list. Inlined views
/// (length ≤ 12) carry their data in the high 96 bits and must be copied
/// verbatim. Written branchlessly so LLVM emits `csel`/`cmov` inside the
/// hot chunked gather loop instead of a per-view conditional branch.
#[inline(always)]
fn adjust_buffer_index(view: u128, base: u32) -> u128 {
// View layout: bits [0..32] = len, [64..96] = buffer_index (long-view only).
let is_long = ((view as u32) > 12) as u128;
view.wrapping_add((is_long * base as u128) << 64)
}

/// Slow-path error constructor for a chunk whose validity check failed. Kept
/// out of the hot loop so the fast path stays small.
#[cold]
#[inline(never)]
fn invalid_dict_key(chunk: &[i32], dict_len: usize) -> ParquetError {
let bad = chunk
.iter()
.copied()
.find(|&k| (k as usize) >= dict_len)
.unwrap_or(0);
general_err!(
"invalid key={} for dictionary of length {}",
bad,
dict_len
)
}

pub struct ByteViewArrayDecoderDictionary {
decoder: DictIndexDecoder,
}
Expand Down Expand Up @@ -500,52 +528,91 @@ impl ByteViewArrayDecoderDictionary {
// then the base_buffer_idx is 5 - 2 = 3
let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;

// Pre-reserve output capacity to avoid per-chunk reallocation in extend
// Pre-reserve output capacity so the gather loop can write through a raw
// pointer without `Vec::extend`'s per-element capacity checks.
output.views.reserve(len);

let mut error = None;
let dict_views: &[u128] = dict.views.as_slice();
let dict_len = dict_views.len();

let read = self.decoder.read(len, |keys| {
// SAFETY: `output.views.reserve(len)` was called above and the
// outer loop ensures we never write more than `len` views.
let out_ptr = unsafe { output.views.as_mut_ptr().add(output.views.len()) };

// Process 8-key chunks with a bulk validity check that the compiler
// can autovectorise, then use `get_unchecked` in the gather loop.
// Mirrors the pattern in `RleDecoder::get_batch_with_dict`.
const CHUNK: usize = 8;
let mut chunks = keys.chunks_exact(CHUNK);
let mut written = 0usize;
// Cast to u32 so that any negative i32 (corrupt data) compares as a
// very large value and fails the check.
let dict_len_u32 = dict_len as u32;

if base_buffer_idx == 0 {
// the dictionary buffers are the last buffers in output, we can directly use the views
output
.views
.extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
Some(&view) => view,
None => {
if error.is_none() {
error = Some(general_err!("invalid key={} for dictionary", *k));
}
0
for chunk in chunks.by_ref() {
// Branchless max-reduction over 8 keys: LLVM emits a SIMD
// umax sequence on aarch64/x86_64 instead of the short-
// circuited `.all()` form which compiles to a chain of
// per-key `cmp + b.ls`.
let max_key = chunk.iter().fold(0u32, |acc, &k| acc.max(k as u32));
if max_key >= dict_len_u32 {
return Err(invalid_dict_key(chunk, dict_len));
}
for (i, &k) in chunk.iter().enumerate() {
// SAFETY: bounds checked above.
unsafe {
let view = *dict_views.get_unchecked(k as usize);
out_ptr.add(written + i).write(view);
}
}));
Ok(())
}
written += CHUNK;
}
for &k in chunks.remainder() {
let view = *dict_views
.get(k as usize)
.ok_or_else(|| general_err!("invalid key={k} for dictionary"))?;
// SAFETY: remainder writes stay within the reserved range.
unsafe { out_ptr.add(written).write(view) };
written += 1;
}
} else {
output
.views
.extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
Some(&view) => {
let len = view as u32;
if len <= 12 {
view
} else {
let mut view = ByteView::from(view);
view.buffer_index += base_buffer_idx;
view.into()
}
}
None => {
if error.is_none() {
error = Some(general_err!("invalid key={} for dictionary", *k));
}
0
for chunk in chunks.by_ref() {
let max_key = chunk.iter().fold(0u32, |acc, &k| acc.max(k as u32));
if max_key >= dict_len_u32 {
return Err(invalid_dict_key(chunk, dict_len));
}
for (i, &k) in chunk.iter().enumerate() {
// SAFETY: bounds checked above.
unsafe {
let view = *dict_views.get_unchecked(k as usize);
out_ptr
.add(written + i)
.write(adjust_buffer_index(view, base_buffer_idx));
}
}));
Ok(())
}
written += CHUNK;
}
for &k in chunks.remainder() {
let view = *dict_views
.get(k as usize)
.ok_or_else(|| general_err!("invalid key={k} for dictionary"))?;
// SAFETY: remainder writes stay within the reserved range.
unsafe {
out_ptr
.add(written)
.write(adjust_buffer_index(view, base_buffer_idx))
};
written += 1;
}
}

// SAFETY: we wrote exactly `written == keys.len()` new views.
debug_assert_eq!(written, keys.len());
unsafe { output.views.set_len(output.views.len() + written) };
Ok(())
})?;
if let Some(e) = error {
return Err(e);
}
Ok(read)
}

Expand Down
Loading