Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions arrow-integration-testing/tests/ipc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ fn write_2_0_0_compression() {
let all_options = [
IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
.unwrap()
.try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
.try_with_compression(Some(ipc::writer::IpcCompression::Lz4Frame))
.unwrap(),
// write IPC version 5 with zstd
IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
.unwrap()
.try_with_compression(Some(ipc::CompressionType::ZSTD))
.try_with_compression(Some(ipc::writer::IpcCompression::zstd_default()))
.unwrap(),
];

Expand Down
8 changes: 4 additions & 4 deletions arrow-ipc/benches/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use arrow_array::{RecordBatch, builder::StringBuilder};
use arrow_buffer::Buffer;
use arrow_ipc::convert::fb_to_schema;
use arrow_ipc::reader::{FileDecoder, FileReader, StreamReader, read_footer_length};
use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
use arrow_ipc::{Block, CompressionType, root_as_footer};
use arrow_ipc::writer::{FileWriter, IpcCompression, IpcWriteOptions, StreamWriter};
use arrow_ipc::{Block, root_as_footer};
use arrow_schema::{DataType, Field, Schema};
use criterion::{Criterion, criterion_group, criterion_main};
use std::io::{Cursor, Write};
Expand Down Expand Up @@ -62,7 +62,7 @@ fn criterion_benchmark(c: &mut Criterion) {
group.bench_function("StreamReader/read_10/zstd", |b| {
let buffer = ipc_stream(
IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.try_with_compression(Some(IpcCompression::zstd_default()))
.unwrap(),
);
b.iter(move || {
Expand All @@ -78,7 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) {
group.bench_function("StreamReader/no_validation/read_10/zstd", |b| {
let buffer = ipc_stream(
IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.try_with_compression(Some(IpcCompression::zstd_default()))
.unwrap(),
);
b.iter(move || {
Expand Down
5 changes: 2 additions & 3 deletions arrow-ipc/benches/ipc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
use arrow_array::{RecordBatch, builder::StringBuilder};
use arrow_ipc::CompressionType;
use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
use arrow_ipc::writer::{FileWriter, IpcCompression, IpcWriteOptions, StreamWriter};
use arrow_schema::{DataType, Field, Schema};
use criterion::{Criterion, criterion_group, criterion_main};
use std::sync::Arc;
Expand All @@ -45,7 +44,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || {
buffer.clear();
let options = IpcWriteOptions::default()
.try_with_compression(Some(CompressionType::ZSTD))
.try_with_compression(Some(IpcCompression::zstd_default()))
.unwrap();
let mut writer =
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options)
Expand Down
90 changes: 90 additions & 0 deletions arrow-ipc/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,44 @@ use arrow_schema::ArrowError;
const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
const LENGTH_OF_PREFIX_DATA: i64 = 8;

/// Represents a valid zstd compression level.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub struct ZstdLevel(i32);

impl ZstdLevel {
// zstd binds to C, and hence zstd::compression_level_range() is not const as this calls the
// underlying C library.
const MINIMUM_LEVEL: i32 = 1;
const MAXIMUM_LEVEL: i32 = 22;

/// Attempts to create a zstd compression level from a given compression level.
///
/// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]).
pub fn try_new(level: i32) -> Result<Self, ArrowError> {
let range = Self::MINIMUM_LEVEL..=Self::MAXIMUM_LEVEL;
if range.contains(&level) {
Ok(Self(level))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"valid compression range {}..={} exceeded.",
range.start(),
range.end(),
)))
}
}

/// Returns the compression level.
pub fn compression_level(&self) -> i32 {
self.0
}
}

impl Default for ZstdLevel {
fn default() -> Self {
Self(1)
}
}

/// Additional context that may be needed for compression.
///
/// In the case of zstd, this will contain the zstd context, which can be reused between subsequent
Expand All @@ -46,6 +84,30 @@ impl Default for CompressionContext {
}
}

impl CompressionContext {
/// Create a [`CompressionContext`] that uses `level` when compressing with
/// [`CompressionCodec::Zstd`]. Other codecs ignore the level.
///
/// Without the `zstd` feature the level is ignored; attempting to use
/// [`CompressionCodec::Zstd`] still returns the same "feature not
/// enabled" error as [`CompressionContext::default`].
pub fn with_zstd_level(level: ZstdLevel) -> Self {
#[cfg(feature = "zstd")]
{
Self {
// `ZstdLevel` is pre-validated, so `new` cannot fail here.
compressor: zstd::bulk::Compressor::new(level.compression_level())
.expect("zstd level was validated by ZstdLevel::try_new"),
}
}
#[cfg(not(feature = "zstd"))]
{
let _ = level;
Self::default()
}
}
}

impl std::fmt::Debug for CompressionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("CompressionContext");
Expand Down Expand Up @@ -115,6 +177,33 @@ impl TryFrom<CompressionType> for CompressionCodec {
}
}

/// Codec (and any codec-specific parameters) for record-batch bodies in an
/// Arrow IPC stream or file.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum IpcCompression {
/// lz4 frame compression. Requires the `lz4` cargo feature.
Lz4Frame,
/// zstd compression at the given [`ZstdLevel`]. Requires the `zstd` cargo feature.
Zstd(ZstdLevel),
}

impl IpcCompression {
/// zstd with [`ZstdLevel::default`]. Equivalent to
/// `Zstd(ZstdLevel::default())` and preserves the behaviour previously
/// reached via `try_with_compression(Some(CompressionType::ZSTD))`.
pub fn zstd_default() -> Self {
Self::Zstd(ZstdLevel::default())
}

/// The on-wire [`crate::CompressionType`] for this codec selection.
pub fn codec(self) -> CompressionType {
match self {
Self::Lz4Frame => CompressionType::LZ4_FRAME,
Self::Zstd(_) => CompressionType::ZSTD,
}
}
}

impl CompressionCodec {
/// Compresses the data in `input` to `output` and appends the
/// data using the specified compression mechanism.
Expand Down Expand Up @@ -357,4 +446,5 @@ mod tests {
.unwrap();
assert_eq!(input_bytes, result.as_slice());
}

}
Loading
Loading