From fda912d95f4b8353349022b88e629f6044ba551b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 16 Apr 2026 20:10:12 +0200 Subject: [PATCH 1/4] Speed up ByteView dictionary decoder with chunks_exact gather MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the `extend(keys.iter().map(...))` loop in `ByteViewArrayDecoderDictionary::read` with a `chunks_exact(8)` loop that bulk-validates each chunk's keys, then uses `get_unchecked` gather plus raw-pointer writes. Matches the pattern in `RleDecoder::get_batch_with_dict`. Drops per-element bounds check, per-element `error.is_none()` branch, and `Vec::extend`'s per-push capacity check. Invalid keys now return an error eagerly via a cold helper instead of zero-filling and deferring. Dictionary-decode microbenchmarks (parquet/benches/arrow_reader.rs): BinaryView mandatory, no NULLs 102.91 µs -> 74.29 µs -27.8% BinaryView optional, no NULLs 104.63 µs -> 76.65 µs -26.9% BinaryView optional, half NULLs 143.25 µs -> 132.46 µs -7.3% StringView mandatory, no NULLs 105.98 µs -> 73.87 µs -28.8% StringView optional, no NULLs 104.62 µs -> 76.34 µs -27.4% StringView optional, half NULLs 141.86 µs -> 131.85 µs -7.1% Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/arrow/array_reader/byte_view_array.rs | 137 +++++++++++++----- 1 file changed, 100 insertions(+), 37 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 1933654118f3..6f94a725d3f9 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -450,6 +450,39 @@ impl ByteViewArrayDecoderPlain { } } +/// Rewrite `view`'s buffer index by `base` when the dictionary buffers were +/// appended later than position 0 in the output buffer list. Inlined views +/// (length ≤ 12) carry their data in the high 96 bits and must be copied +/// verbatim. +#[inline(always)] +fn adjust_buffer_index(view: u128, base: u32) -> u128 { + let len = view as u32; + if len <= 12 { + view + } else { + let mut bv = ByteView::from(view); + bv.buffer_index += base; + bv.into() + } +} + +/// Slow-path error constructor for a chunk whose validity check failed. Kept +/// out of the hot loop so the fast path stays small. +#[cold] +#[inline(never)] +fn invalid_dict_key(chunk: &[i32], dict_len: usize) -> ParquetError { + let bad = chunk + .iter() + .copied() + .find(|&k| (k as usize) >= dict_len) + .unwrap_or(0); + general_err!( + "invalid key={} for dictionary of length {}", + bad, + dict_len + ) +} + pub struct ByteViewArrayDecoderDictionary { decoder: DictIndexDecoder, } @@ -500,52 +533,82 @@ impl ByteViewArrayDecoderDictionary { // then the base_buffer_idx is 5 - 2 = 3 let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32; - // Pre-reserve output capacity to avoid per-chunk reallocation in extend + // Pre-reserve output capacity so the gather loop can write through a raw + // pointer without `Vec::extend`'s per-element capacity checks. output.views.reserve(len); - let mut error = None; + let dict_views: &[u128] = dict.views.as_slice(); + let dict_len = dict_views.len(); + let read = self.decoder.read(len, |keys| { + // SAFETY: `output.views.reserve(len)` was called above and the + // outer loop ensures we never write more than `len` views. + let out_ptr = unsafe { output.views.as_mut_ptr().add(output.views.len()) }; + + // Process 8-key chunks with a bulk validity check that the compiler + // can autovectorise, then use `get_unchecked` in the gather loop. + // Mirrors the pattern in `RleDecoder::get_batch_with_dict`. + const CHUNK: usize = 8; + let mut chunks = keys.chunks_exact(CHUNK); + let mut written = 0usize; + if base_buffer_idx == 0 { - // the dictionary buffers are the last buffers in output, we can directly use the views - output - .views - .extend(keys.iter().map(|k| match dict.views.get(*k as usize) { - Some(&view) => view, - None => { - if error.is_none() { - error = Some(general_err!("invalid key={} for dictionary", *k)); - } - 0 + for chunk in chunks.by_ref() { + if !chunk.iter().all(|&k| (k as usize) < dict_len) { + return Err(invalid_dict_key(chunk, dict_len)); + } + for (i, &k) in chunk.iter().enumerate() { + // SAFETY: bounds checked above. + unsafe { + let view = *dict_views.get_unchecked(k as usize); + out_ptr.add(written + i).write(view); } - })); - Ok(()) + } + written += CHUNK; + } + for &k in chunks.remainder() { + let view = *dict_views + .get(k as usize) + .ok_or_else(|| general_err!("invalid key={k} for dictionary"))?; + // SAFETY: remainder writes stay within the reserved range. + unsafe { out_ptr.add(written).write(view) }; + written += 1; + } } else { - output - .views - .extend(keys.iter().map(|k| match dict.views.get(*k as usize) { - Some(&view) => { - let len = view as u32; - if len <= 12 { - view - } else { - let mut view = ByteView::from(view); - view.buffer_index += base_buffer_idx; - view.into() - } + for chunk in chunks.by_ref() { + if !chunk.iter().all(|&k| (k as usize) < dict_len) { + return Err(invalid_dict_key(chunk, dict_len)); + } + for (i, &k) in chunk.iter().enumerate() { + // SAFETY: bounds checked above. + unsafe { + let view = *dict_views.get_unchecked(k as usize); + out_ptr + .add(written + i) + .write(adjust_buffer_index(view, base_buffer_idx)); } - None => { - if error.is_none() { - error = Some(general_err!("invalid key={} for dictionary", *k)); - } - 0 - } - })); - Ok(()) + } + written += CHUNK; + } + for &k in chunks.remainder() { + let view = *dict_views + .get(k as usize) + .ok_or_else(|| general_err!("invalid key={k} for dictionary"))?; + // SAFETY: remainder writes stay within the reserved range. + unsafe { + out_ptr + .add(written) + .write(adjust_buffer_index(view, base_buffer_idx)) + }; + written += 1; + } } + + // SAFETY: we wrote exactly `written == keys.len()` new views. + debug_assert_eq!(written, keys.len()); + unsafe { output.views.set_len(output.views.len() + written) }; + Ok(()) })?; - if let Some(e) = error { - return Err(e); - } Ok(read) } From fe1728d50ca382bb522b999ad306041ad186e3e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 16 Apr 2026 20:20:54 +0200 Subject: [PATCH 2/4] Branchless adjust_buffer_index and SIMD-friendly bounds check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two small follow-ups to the chunked-gather rewrite, both driven by inspecting the aarch64 asm: 1) Rewrite `adjust_buffer_index` without an `if/else` so LLVM emits a `csel` in the hot chunked loop. Previously the main 8-key gather went through an out-of-line block with a conditional branch per view; now each view is 5 branchless instructions (ldp/cmp/csel/ add/stp). 2) Replace `chunk.iter().all(|&k| cond)` with a max-reduction over `u32` keys. `.all()` short-circuits, which blocks vectorisation — LLVM emitted 8 sequential `ldrsw+cmp+b.ls`. The max-reduction compiles on aarch64 NEON to: ldp q1, q0, [x1] ; one load, 8 keys umax.4s v2, v1, v0 ; pairwise lane max umaxv.4s s2, v2 ; horizontal reduce cmp w13, w22 ; one compare b.hs ; one branch The NEON registers are then reused for the gather (`fmov`/`mov.s v[i]`) so keys are loaded exactly once. Casting keys via `k as u32` correctly rejects any negative i32 (corrupt data) because a negative value becomes a large u32. Microbenchmark deltas over the previous commit (criterion, aarch64): BinaryView mandatory, no NULLs 74.29 µs -> 72.96 µs -1.8% BinaryView optional, no NULLs 76.65 µs -> 75.01 µs -2.1% StringView mandatory, no NULLs 73.87 µs -> 72.27 µs -2.2% StringView optional, no NULLs 76.34 µs -> 75.41 µs -1.2% Cumulative vs. main HEAD (89b1497484): BinaryView mandatory, no NULLs 102.91 µs -> 72.96 µs -29.2% BinaryView optional, no NULLs 104.63 µs -> 75.01 µs -28.4% BinaryView optional, half NULLs 143.25 µs -> 133.06 µs -7.4% StringView mandatory, no NULLs 105.98 µs -> 72.27 µs -30.7% StringView optional, no NULLs 104.62 µs -> 75.41 µs -29.2% StringView optional, half NULLs 141.86 µs -> 132.20 µs -6.8% Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/arrow/array_reader/byte_view_array.rs | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 6f94a725d3f9..a95d009f548d 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -453,17 +453,13 @@ impl ByteViewArrayDecoderPlain { /// Rewrite `view`'s buffer index by `base` when the dictionary buffers were /// appended later than position 0 in the output buffer list. Inlined views /// (length ≤ 12) carry their data in the high 96 bits and must be copied -/// verbatim. +/// verbatim. Written branchlessly so LLVM emits `csel`/`cmov` inside the +/// hot chunked gather loop instead of a per-view conditional branch. #[inline(always)] fn adjust_buffer_index(view: u128, base: u32) -> u128 { - let len = view as u32; - if len <= 12 { - view - } else { - let mut bv = ByteView::from(view); - bv.buffer_index += base; - bv.into() - } + // View layout: bits [0..32] = len, [64..96] = buffer_index (long-view only). + let is_long = ((view as u32) > 12) as u128; + view.wrapping_add((is_long * base as u128) << 64) } /// Slow-path error constructor for a chunk whose validity check failed. Kept @@ -551,10 +547,21 @@ impl ByteViewArrayDecoderDictionary { const CHUNK: usize = 8; let mut chunks = keys.chunks_exact(CHUNK); let mut written = 0usize; + // Cast to u32 so that any negative i32 (corrupt data) compares as a + // very large value and fails the check. + let dict_len_u32 = dict_len as u32; if base_buffer_idx == 0 { for chunk in chunks.by_ref() { - if !chunk.iter().all(|&k| (k as usize) < dict_len) { + // Branchless max-reduction over 8 keys: LLVM emits a SIMD + // umax sequence on aarch64/x86_64 instead of the short- + // circuited `.all()` form which compiles to a chain of + // per-key `cmp + b.ls`. + let mut max_key = 0u32; + for &k in chunk { + max_key = max_key.max(k as u32); + } + if max_key >= dict_len_u32 { return Err(invalid_dict_key(chunk, dict_len)); } for (i, &k) in chunk.iter().enumerate() { @@ -576,7 +583,11 @@ impl ByteViewArrayDecoderDictionary { } } else { for chunk in chunks.by_ref() { - if !chunk.iter().all(|&k| (k as usize) < dict_len) { + let mut max_key = 0u32; + for &k in chunk { + max_key = max_key.max(k as u32); + } + if max_key >= dict_len_u32 { return Err(invalid_dict_key(chunk, dict_len)); } for (i, &k) in chunk.iter().enumerate() { From 90e095bcaa5b26dc37571fc2238ec2aee2316c6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 17 Apr 2026 06:22:09 +0200 Subject: [PATCH 3/4] Remove unused ByteView import Co-Authored-By: Claude Opus 4.7 (1M context) --- parquet/src/arrow/array_reader/byte_view_array.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index a95d009f548d..c143fd35fdb4 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -30,7 +30,6 @@ use crate::schema::types::ColumnDescPtr; use crate::util::utf8::check_valid_utf8; use arrow_array::{ArrayRef, builder::make_view}; use arrow_buffer::Buffer; -use arrow_data::ByteView; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; From 0fa7d13ca74edb09b9f5e8ca23cf08fb43dcd5e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 17 Apr 2026 06:39:23 +0200 Subject: [PATCH 4/4] Use fold for max-key reduction in dict gather Co-Authored-By: Claude Opus 4.7 (1M context) --- parquet/src/arrow/array_reader/byte_view_array.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index c143fd35fdb4..91970f79f172 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -556,10 +556,7 @@ impl ByteViewArrayDecoderDictionary { // umax sequence on aarch64/x86_64 instead of the short- // circuited `.all()` form which compiles to a chain of // per-key `cmp + b.ls`. - let mut max_key = 0u32; - for &k in chunk { - max_key = max_key.max(k as u32); - } + let max_key = chunk.iter().fold(0u32, |acc, &k| acc.max(k as u32)); if max_key >= dict_len_u32 { return Err(invalid_dict_key(chunk, dict_len)); } @@ -582,10 +579,7 @@ impl ByteViewArrayDecoderDictionary { } } else { for chunk in chunks.by_ref() { - let mut max_key = 0u32; - for &k in chunk { - max_key = max_key.max(k as u32); - } + let max_key = chunk.iter().fold(0u32, |acc, &k| acc.max(k as u32)); if max_key >= dict_len_u32 { return Err(invalid_dict_key(chunk, dict_len)); }