diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 1933654118f..91970f79f17 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -30,7 +30,6 @@ use crate::schema::types::ColumnDescPtr; use crate::util::utf8::check_valid_utf8; use arrow_array::{ArrayRef, builder::make_view}; use arrow_buffer::Buffer; -use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; @@ -450,6 +449,35 @@ impl ByteViewArrayDecoderPlain { } } +/// Rewrite `view`'s buffer index by `base` when the dictionary buffers were +/// appended later than position 0 in the output buffer list. Inlined views +/// (length ≤ 12) carry their data in the high 96 bits and must be copied +/// verbatim. Written branchlessly so LLVM emits `csel`/`cmov` inside the +/// hot chunked gather loop instead of a per-view conditional branch. +#[inline(always)] +fn adjust_buffer_index(view: u128, base: u32) -> u128 { + // View layout: bits [0..32] = len, [64..96] = buffer_index (long-view only). + let is_long = ((view as u32) > 12) as u128; + view.wrapping_add((is_long * base as u128) << 64) +} + +/// Slow-path error constructor for a chunk whose validity check failed. Kept +/// out of the hot loop so the fast path stays small. +#[cold] +#[inline(never)] +fn invalid_dict_key(chunk: &[i32], dict_len: usize) -> ParquetError { + let bad = chunk + .iter() + .copied() + .find(|&k| (k as usize) >= dict_len) + .unwrap_or(0); + general_err!( + "invalid key={} for dictionary of length {}", + bad, + dict_len + ) +} + pub struct ByteViewArrayDecoderDictionary { decoder: DictIndexDecoder, } @@ -500,52 +528,91 @@ impl ByteViewArrayDecoderDictionary { // then the base_buffer_idx is 5 - 2 = 3 let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32; - // Pre-reserve output capacity to avoid per-chunk reallocation in extend + // Pre-reserve output capacity so the gather loop can write through a raw + // pointer without `Vec::extend`'s per-element capacity checks. output.views.reserve(len); - let mut error = None; + let dict_views: &[u128] = dict.views.as_slice(); + let dict_len = dict_views.len(); + let read = self.decoder.read(len, |keys| { + // SAFETY: `output.views.reserve(len)` was called above and the + // outer loop ensures we never write more than `len` views. + let out_ptr = unsafe { output.views.as_mut_ptr().add(output.views.len()) }; + + // Process 8-key chunks with a bulk validity check that the compiler + // can autovectorise, then use `get_unchecked` in the gather loop. + // Mirrors the pattern in `RleDecoder::get_batch_with_dict`. + const CHUNK: usize = 8; + let mut chunks = keys.chunks_exact(CHUNK); + let mut written = 0usize; + // Cast to u32 so that any negative i32 (corrupt data) compares as a + // very large value and fails the check. + let dict_len_u32 = dict_len as u32; + if base_buffer_idx == 0 { - // the dictionary buffers are the last buffers in output, we can directly use the views - output - .views - .extend(keys.iter().map(|k| match dict.views.get(*k as usize) { - Some(&view) => view, - None => { - if error.is_none() { - error = Some(general_err!("invalid key={} for dictionary", *k)); - } - 0 + for chunk in chunks.by_ref() { + // Branchless max-reduction over 8 keys: LLVM emits a SIMD + // umax sequence on aarch64/x86_64 instead of the short- + // circuited `.all()` form which compiles to a chain of + // per-key `cmp + b.ls`. + let max_key = chunk.iter().fold(0u32, |acc, &k| acc.max(k as u32)); + if max_key >= dict_len_u32 { + return Err(invalid_dict_key(chunk, dict_len)); + } + for (i, &k) in chunk.iter().enumerate() { + // SAFETY: bounds checked above. + unsafe { + let view = *dict_views.get_unchecked(k as usize); + out_ptr.add(written + i).write(view); } - })); - Ok(()) + } + written += CHUNK; + } + for &k in chunks.remainder() { + let view = *dict_views + .get(k as usize) + .ok_or_else(|| general_err!("invalid key={k} for dictionary"))?; + // SAFETY: remainder writes stay within the reserved range. + unsafe { out_ptr.add(written).write(view) }; + written += 1; + } } else { - output - .views - .extend(keys.iter().map(|k| match dict.views.get(*k as usize) { - Some(&view) => { - let len = view as u32; - if len <= 12 { - view - } else { - let mut view = ByteView::from(view); - view.buffer_index += base_buffer_idx; - view.into() - } - } - None => { - if error.is_none() { - error = Some(general_err!("invalid key={} for dictionary", *k)); - } - 0 + for chunk in chunks.by_ref() { + let max_key = chunk.iter().fold(0u32, |acc, &k| acc.max(k as u32)); + if max_key >= dict_len_u32 { + return Err(invalid_dict_key(chunk, dict_len)); + } + for (i, &k) in chunk.iter().enumerate() { + // SAFETY: bounds checked above. + unsafe { + let view = *dict_views.get_unchecked(k as usize); + out_ptr + .add(written + i) + .write(adjust_buffer_index(view, base_buffer_idx)); } - })); - Ok(()) + } + written += CHUNK; + } + for &k in chunks.remainder() { + let view = *dict_views + .get(k as usize) + .ok_or_else(|| general_err!("invalid key={k} for dictionary"))?; + // SAFETY: remainder writes stay within the reserved range. + unsafe { + out_ptr + .add(written) + .write(adjust_buffer_index(view, base_buffer_idx)) + }; + written += 1; + } } + + // SAFETY: we wrote exactly `written == keys.len()` new views. + debug_assert_eq!(written, keys.len()); + unsafe { output.views.set_len(output.views.len() + written) }; + Ok(()) })?; - if let Some(e) = error { - return Err(e); - } Ok(read) }