Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 60 additions & 45 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ use bytes::Bytes;
use crate::errors::{ParquetError, Result};
use crate::util::bit_util::{self, BitReader, BitWriter, FromBitpacked};

/// Maximum groups of 8 values per bit-packed run. Current value is 64.
/// Number of values in one bit-packed group. The Parquet RLE/bit-packing hybrid
/// format always bit-packs values in multiples of this count (see the
/// [format spec](https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3):
/// "we always bit-pack a multiple of 8 values at a time").
const BIT_PACK_GROUP_SIZE: usize = 8;

/// Maximum groups of `BIT_PACK_GROUP_SIZE` values per bit-packed run. Current value is 64.
const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;

/// A RLE/Bit-Packing hybrid encoder.
Expand All @@ -54,9 +60,9 @@ pub struct RleEncoder {
bit_writer: BitWriter,

// Buffered values for bit-packed runs.
buffered_values: [u64; 8],
buffered_values: [u64; BIT_PACK_GROUP_SIZE],

// Number of current buffered values. Must be less than 8.
// Number of current buffered values. Must be less than BIT_PACK_GROUP_SIZE.
num_buffered_values: usize,

// The current (also last) value that was written and the count of how many
Expand Down Expand Up @@ -89,7 +95,7 @@ impl RleEncoder {
RleEncoder {
bit_width,
bit_writer,
buffered_values: [0; 8],
buffered_values: [0; BIT_PACK_GROUP_SIZE],
num_buffered_values: 0,
current_value: 0,
repeat_count: 0,
Expand All @@ -101,22 +107,23 @@ impl RleEncoder {
/// Returns the maximum buffer size to encode `num_values` values with
/// `bit_width`.
pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize {
// The maximum size occurs with the shortest possible runs of 8
let num_runs = bit_util::ceil(num_values, 8);
// The maximum size occurs with the shortest possible runs of BIT_PACK_GROUP_SIZE
let num_runs = bit_util::ceil(num_values, BIT_PACK_GROUP_SIZE);

// The number of bytes in a run of 8
// The number of bytes in a run of BIT_PACK_GROUP_SIZE
let bytes_per_run = bit_width as usize;

// The maximum size if stored as shortest possible bit packed runs of 8
// The maximum size if stored as shortest possible bit packed runs of BIT_PACK_GROUP_SIZE
let bit_packed_max_size = num_runs + num_runs * bytes_per_run;

// The length of `8` VLQ encoded
// The length of `BIT_PACK_GROUP_SIZE` VLQ encoded
let rle_len_prefix = 1;

// The length of an RLE run of 8
let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8);
// The length of an RLE run of BIT_PACK_GROUP_SIZE
let min_rle_run_size =
rle_len_prefix + bit_util::ceil(bit_width as usize, u8::BITS as usize);

// The maximum size if stored as shortest possible RLE runs of 8
// The maximum size if stored as shortest possible RLE runs of BIT_PACK_GROUP_SIZE
let rle_max_size = num_runs * min_rle_run_size;

bit_packed_max_size.max(rle_max_size)
Expand All @@ -125,16 +132,17 @@ impl RleEncoder {
/// Encodes `value`, which must be representable with `bit_width` bits.
#[inline]
pub fn put(&mut self, value: u64) {
// This function buffers 8 values at a time. After seeing 8 values, it
// decides whether the current run should be encoded in bit-packed or RLE.
// This function buffers BIT_PACK_GROUP_SIZE values at a time. After seeing that
// many values, it decides whether the current run should be encoded in bit-packed
// or RLE.
if self.current_value == value {
self.repeat_count += 1;
if self.repeat_count > 8 {
if self.repeat_count > BIT_PACK_GROUP_SIZE {
// A continuation of last value. No need to buffer.
return;
}
} else {
if self.repeat_count >= 8 {
if self.repeat_count >= BIT_PACK_GROUP_SIZE {
// The current RLE run has ended and we've gathered enough. Flush first.
debug_assert_eq!(self.bit_packed_count, 0);
self.flush_rle_run();
Expand All @@ -145,9 +153,9 @@ impl RleEncoder {

self.buffered_values[self.num_buffered_values] = value;
self.num_buffered_values += 1;
if self.num_buffered_values == 8 {
if self.num_buffered_values == BIT_PACK_GROUP_SIZE {
// Buffered values are full. Flush them.
debug_assert_eq!(self.bit_packed_count % 8, 0);
debug_assert_eq!(self.bit_packed_count % BIT_PACK_GROUP_SIZE, 0);
self.flush_buffered_values();
}
}
Expand Down Expand Up @@ -219,9 +227,9 @@ impl RleEncoder {
if self.repeat_count > 0 && all_repeat {
self.flush_rle_run();
} else {
// Buffer the last group of bit-packed values to 8 by padding with 0s.
// Buffer the last group of bit-packed values to BIT_PACK_GROUP_SIZE by padding with 0s.
if self.num_buffered_values > 0 {
while self.num_buffered_values < 8 {
while self.num_buffered_values < BIT_PACK_GROUP_SIZE {
self.buffered_values[self.num_buffered_values] = 0;
self.num_buffered_values += 1;
}
Expand All @@ -239,7 +247,7 @@ impl RleEncoder {
self.bit_writer.put_vlq_int(indicator_value as u64);
self.bit_writer.put_aligned(
self.current_value,
bit_util::ceil(self.bit_width as usize, 8),
bit_util::ceil(self.bit_width as usize, u8::BITS as usize),
);
self.num_buffered_values = 0;
self.repeat_count = 0;
Expand All @@ -263,7 +271,7 @@ impl RleEncoder {
// Called when ending a bit-packed run. Writes the indicator byte to the reserved
// position in `bit_writer`
fn finish_bit_packed_run(&mut self) {
let num_groups = self.bit_packed_count / 8;
let num_groups = self.bit_packed_count / BIT_PACK_GROUP_SIZE;
let indicator_byte = ((num_groups << 1) | 1) as u8;
self.bit_writer
.put_aligned_offset(indicator_byte, 1, self.indicator_byte_pos as usize);
Expand All @@ -272,20 +280,20 @@ impl RleEncoder {
}

fn flush_buffered_values(&mut self) {
if self.repeat_count >= 8 {
if self.repeat_count >= BIT_PACK_GROUP_SIZE {
// Clear buffered values as they are not needed
self.num_buffered_values = 0;
if self.bit_packed_count > 0 {
// In this case we have chosen to switch to RLE encoding. Close out the
// previous bit-packed run.
debug_assert_eq!(self.bit_packed_count % 8, 0);
debug_assert_eq!(self.bit_packed_count % BIT_PACK_GROUP_SIZE, 0);
self.finish_bit_packed_run();
}
return;
}

self.bit_packed_count += self.num_buffered_values;
let num_groups = self.bit_packed_count / 8;
let num_groups = self.bit_packed_count / BIT_PACK_GROUP_SIZE;
if num_groups + 1 >= MAX_GROUPS_PER_BIT_PACKED_RUN {
// We've reached the maximum value that can be hold in a single bit-packed
// run.
Expand Down Expand Up @@ -359,7 +367,7 @@ impl RleDecoder {
#[inline(never)]
#[allow(unused)]
pub fn get<T: FromBitpacked>(&mut self) -> Result<Option<T>> {
assert!(size_of::<T>() <= 8);
assert!(size_of::<T>() <= size_of::<u64>());

while self.rle_left == 0 && self.bit_packed_left == 0 {
if !self.reload()? {
Expand Down Expand Up @@ -395,7 +403,7 @@ impl RleDecoder {

#[inline(never)]
pub fn get_batch<T: FromBitpacked + Clone>(&mut self, buffer: &mut [T]) -> Result<usize> {
assert!(size_of::<T>() <= 8);
assert!(size_of::<T>() <= size_of::<u64>());

let mut values_read = 0;
while values_read < buffer.len() {
Expand Down Expand Up @@ -516,8 +524,8 @@ impl RleDecoder {
{
let out = &mut buffer[values_read..values_read + num_values];
let idx = &index_buf[..num_values];
let mut out_chunks = out.chunks_exact_mut(8);
let idx_chunks = idx.chunks_exact(8);
let mut out_chunks = out.chunks_exact_mut(BIT_PACK_GROUP_SIZE);
let idx_chunks = idx.chunks_exact(BIT_PACK_GROUP_SIZE);
for (out_chunk, idx_chunk) in out_chunks.by_ref().zip(idx_chunks) {
let dict_len = dict.len();
assert!(
Expand All @@ -532,7 +540,7 @@ impl RleDecoder {
for (b, i) in out_chunks
.into_remainder()
.iter_mut()
.zip(idx.chunks_exact(8).remainder().iter())
.zip(idx.chunks_exact(BIT_PACK_GROUP_SIZE).remainder().iter())
{
b.clone_from(&dict[*i as usize]);
}
Expand Down Expand Up @@ -566,10 +574,10 @@ impl RleDecoder {
return Ok(false);
}
if indicator_value & 1 == 1 {
self.bit_packed_left = ((indicator_value >> 1) * 8) as u32;
self.bit_packed_left = ((indicator_value >> 1) * BIT_PACK_GROUP_SIZE as i64) as u32;
} else {
self.rle_left = (indicator_value >> 1) as u32;
let value_width = bit_util::ceil(self.bit_width as usize, 8);
let value_width = bit_util::ceil(self.bit_width as usize, u8::BITS as usize);
self.current_value = bit_reader.get_aligned::<u64>(value_width);
self.current_value.ok_or_else(|| {
general_err!("parquet_data_error: not enough data for RLE decoding")
Expand Down Expand Up @@ -598,7 +606,7 @@ mod tests {
let data = vec![0x03, 0x88, 0xC6, 0xFA];
let mut decoder: RleDecoder = RleDecoder::new(3);
decoder.set_data(data.into()).unwrap();
let mut buffer = vec![0; 8];
let mut buffer = vec![0; BIT_PACK_GROUP_SIZE];
let expected = vec![0, 1, 2, 3, 4, 5, 6, 7];
let result = decoder.get_batch::<i32>(&mut buffer);
assert!(result.is_ok());
Expand Down Expand Up @@ -782,14 +790,18 @@ mod tests {
let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
let mut decoder: RleDecoder = RleDecoder::new(3);
decoder.set_data(data.into()).unwrap();
let mut buffer = vec![""; 8];
let mut buffer = vec![""; BIT_PACK_GROUP_SIZE];
let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"];
let skipped = decoder.skip(4).expect("skipping four values");
assert_eq!(skipped, 4);
let remainder = decoder
.get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8)
.get_batch_with_dict::<&str>(
dict.as_slice(),
buffer.as_mut_slice(),
BIT_PACK_GROUP_SIZE,
)
.expect("getting remainder");
assert_eq!(remainder, 8);
assert_eq!(remainder, BIT_PACK_GROUP_SIZE);
assert_eq!(buffer, expected);
}

Expand Down Expand Up @@ -851,7 +863,7 @@ mod tests {
&values[..],
width as u8,
None,
2 * (1 + bit_util::ceil(width as i64, 8) as i32),
2 * (1 + bit_util::ceil(width as i64, u8::BITS as i64) as i32),
);
}

Expand All @@ -861,9 +873,12 @@ mod tests {
for i in 0..101 {
values.push(i % 2);
}
let num_groups = bit_util::ceil(100, 8) as u8;
let num_groups = bit_util::ceil(100, BIT_PACK_GROUP_SIZE) as u8;
expected_buffer.push((num_groups << 1) | 1);
expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010);
expected_buffer.resize(
expected_buffer.len() + 100 / BIT_PACK_GROUP_SIZE,
0b10101010,
);

// For the last 4 0 and 1's, padded with 0.
expected_buffer.push(0b00001010);
Expand All @@ -874,12 +889,12 @@ mod tests {
1 + num_groups as i32,
);
for width in 2..MAX_WIDTH + 1 {
let num_values = bit_util::ceil(100, 8) * 8;
let num_values = bit_util::ceil(100, BIT_PACK_GROUP_SIZE) * BIT_PACK_GROUP_SIZE;
validate_rle(
&values,
width as u8,
None,
1 + bit_util::ceil(width as i64 * num_values, 8) as i32,
1 + bit_util::ceil(width as i64 * num_values as i64, u8::BITS as i64) as i32,
);
}
}
Expand Down Expand Up @@ -973,9 +988,9 @@ mod tests {
.get_batch(&mut actual_values)
.expect("get_batch() should be OK");

// Should decode 8 values despite only encoding 6 as length of
// bit packed run is always multiple of 8
assert_eq!(r, 8);
// Should decode BIT_PACK_GROUP_SIZE values despite only encoding 6 as length of
// bit packed run is always a multiple of BIT_PACK_GROUP_SIZE
assert_eq!(r, BIT_PACK_GROUP_SIZE);
assert_eq!(actual_values[..6], values);
assert_eq!(actual_values[6], 0);
assert_eq!(actual_values[7], 0);
Expand All @@ -996,7 +1011,7 @@ mod tests {
let num_values = 2002;

// bit-packed header
let run_bytes = ceil(num_values * bit_width, 8) as u64;
let run_bytes = ceil(num_values * bit_width, u8::BITS as usize) as u64;
writer.put_vlq_int((run_bytes << 1) | 1);
for _ in 0..run_bytes {
writer.put_aligned(0xFF_u8, 1);
Expand Down
Loading