Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog.d/25216_gcp_cloud_storage_parquet.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add Apache Parquet batch encoding support for the `gcp_cloud_storage` sink with flexible schema definitions.

Enable the `codecs-parquet` feature and configure `batch_encoding` with `codec = "parquet"` in the gcp_cloud_storage sink configuration.

authors: sonnens
163 changes: 135 additions & 28 deletions src/sinks/gcp/cloud_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use indoc::indoc;
use snafu::{ResultExt, Snafu};
use tower::ServiceBuilder;
use uuid::Uuid;
#[cfg(feature = "codecs-parquet")]
use vector_lib::codecs::{BatchEncoder, encoding::BatchSerializerConfig};
use vector_lib::{
TimeZone,
codecs::encoding::Framer,
codecs::{EncoderKind, encoding::Framer},
configurable::configurable_component,
event::{EventFinalizers, Finalizable},
request_metadata::RequestMetadata,
Expand Down Expand Up @@ -151,6 +153,16 @@ pub struct GcsSinkConfig {
#[serde(flatten)]
encoding: EncodingConfigWithFraming,

/// Batch encoding configuration for columnar formats.
///
/// When set, events are encoded together as a batch in a columnar format (e.g., Parquet)
/// instead of the standard per-event framing-based encoding. The columnar format handles
/// its own internal compression, so the top-level `compression` setting is bypassed.
#[cfg(feature = "codecs-parquet")]
#[configurable(derived)]
#[serde(default)]
pub batch_encoding: Option<BatchSerializerConfig>,

/// Compression configuration.
///
/// All compression algorithms use the default compression level unless otherwise specified.
Expand Down Expand Up @@ -238,6 +250,8 @@ fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
content_encoding: Default::default(),
cache_control: Default::default(),
encoding,
#[cfg(feature = "codecs-parquet")]
batch_encoding: None,
compression: Compression::gzip_default(),
batch: Default::default(),
endpoint: Default::default(),
Expand Down Expand Up @@ -282,6 +296,10 @@ impl SinkConfig for GcsSinkConfig {
}

fn input(&self) -> Input {
#[cfg(feature = "codecs-parquet")]
if let Some(batch_config) = &self.batch_encoding {
return Input::new(batch_config.input_type() & DataType::Log);
}
Input::new(self.encoding.config().1.input_type() & DataType::Log)
}

Expand All @@ -299,21 +317,79 @@ impl GcsSinkConfig {
cx: SinkContext,
) -> crate::Result<VectorSink> {
let request = self.request.into_settings();

let batch_settings = self.batch.into_batcher_settings()?;

let partitioner = self.key_partitioner()?;

let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());

let svc = ServiceBuilder::new()
.settings(request, GcsRetryLogic::default())
.service(GcsService::new(client, base_url, auth));

let request_settings = RequestSettings::new(self, cx)?;
#[cfg(feature = "codecs-parquet")]
if let Some(batch_config) = &self.batch_encoding {
if !matches!(batch_config, BatchSerializerConfig::Parquet(_)) {
return Err(
"batch_encoding only supports encoding with parquet format for gcp_cloud_storage sink"
.into(),
);
}

let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
let batch_serializer = batch_config.build_batch_serializer()?;
let batch_encoder = BatchEncoder::new(batch_serializer);

let content_type_str = self
.content_type
.clone()
.unwrap_or_else(|| batch_encoder.content_type().to_string());

let extension = self
.filename_extension
.clone()
.or_else(|| match batch_config {
BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()),
#[allow(unreachable_patterns)]
_ => None,
})
.unwrap_or_else(|| self.compression.extension().into());

if self.compression != Compression::None {
warn!("Top level compression setting ignored when batch_encoding set to parquet.")
}

let request_settings = RequestSettings::new(
self,
cx,
EncoderKind::Batch(batch_encoder),
Compression::None,
extension,
content_type_str,
)?;

let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
return Ok(VectorSink::from_event_streamsink(sink));
}

let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
let enc = Encoder::<Framer>::new(framer, serializer);
let content_type_str = self
.content_type
.clone()
.unwrap_or_else(|| enc.content_type().to_string());
let extension = self
.filename_extension
.clone()
.unwrap_or_else(|| self.compression.extension().into());

let request_settings = RequestSettings::new(
self,
cx,
EncoderKind::Framed(Box::new(enc)),
self.compression,
extension,
content_type_str,
)?;

let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
Ok(VectorSink::from_event_streamsink(sink))
}

Expand All @@ -340,15 +416,15 @@ struct RequestSettings {
extension: String,
time_format: String,
append_uuid: bool,
encoder: (Transformer, Encoder<Framer>),
encoder: (Transformer, EncoderKind),
compression: Compression,
tz_offset: Option<FixedOffset>,
}

impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
type Metadata = (String, EventFinalizers);
type Events = Vec<Event>;
type Encoder = (Transformer, Encoder<Framer>);
type Encoder = (Transformer, EncoderKind);
type Payload = Bytes;
type Request = GcsRequest;
type Error = io::Error;
Expand Down Expand Up @@ -417,22 +493,23 @@ impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
}

impl RequestSettings {
fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
fn new(
config: &GcsSinkConfig,
cx: SinkContext,
encoder: EncoderKind,
compression: Compression,
extension: String,
content_type_str: String,
) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);

let acl = config
.acl
.map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
let content_type_str = config
.content_type
.as_deref()
.unwrap_or_else(|| encoder.content_type());
let content_type = HeaderValue::from_str(content_type_str)?;
let content_type = HeaderValue::from_str(&content_type_str)?;
let content_encoding = match &config.content_encoding {
Some(ce) => Some(HeaderValue::from_str(ce)?),
None => config
.compression
None => compression
.content_encoding()
.map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap()),
};
Expand All @@ -453,10 +530,6 @@ impl RequestSettings {
.collect::<Result<Vec<_>, _>>()
})
.unwrap_or_else(|| Ok(vec![]))?;
let extension = config
.filename_extension
.clone()
.unwrap_or_else(|| config.compression.extension().into());
let time_format = config.filename_time_format.clone();
let append_uuid = config.filename_append_uuid;
let offset = config
Expand All @@ -474,7 +547,7 @@ impl RequestSettings {
extension,
time_format,
append_uuid,
compression: config.compression,
compression,
encoder: (transformer, encoder),
tz_offset: offset,
})
Expand Down Expand Up @@ -502,8 +575,12 @@ mod tests {
request_metadata::GroupedCountByteSize,
};

use vector_lib::codecs::EncoderKind;
use vector_lib::codecs::encoding::Framer;

use super::*;
use crate::{
codecs::{Encoder, SinkType},
event::LogEvent,
test_util::{
components::{SINK_TAGS, run_and_assert_sink_compliance},
Expand Down Expand Up @@ -562,8 +639,35 @@ mod tests {
assert_eq!(key, "key: value");
}

fn try_request_settings(
sink_config: &GcsSinkConfig,
context: SinkContext,
) -> crate::Result<RequestSettings> {
let (framer, serializer) = sink_config
.encoding
.build(SinkType::MessageBased)
.expect("Could not build encoder");
let enc = Encoder::<Framer>::new(framer, serializer);
let content_type_str = sink_config
.content_type
.clone()
.unwrap_or_else(|| enc.content_type().to_string());
let extension = sink_config
.filename_extension
.clone()
.unwrap_or_else(|| sink_config.compression.extension().into());
RequestSettings::new(
sink_config,
context,
EncoderKind::Framed(Box::new(enc)),
sink_config.compression,
extension,
content_type_str,
)
}

fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
RequestSettings::new(sink_config, context).expect("Could not create request settings")
try_request_settings(sink_config, context).expect("Could not create request settings")
}

fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
Expand Down Expand Up @@ -654,10 +758,13 @@ mod tests {
let sink_config = GcsSinkConfig {
// Invalid header value with newline character
content_type: Some("text/plain\nInvalid".to_string()),
..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
..default_config(EncodingConfigWithFraming::from((
None::<FramingConfig>,
TextSerializerConfig::default(),
)))
};

let result = RequestSettings::new(&sink_config, context);
let result = try_request_settings(&sink_config, context);
// Should return an error, not panic
assert!(result.is_err());
}
Expand Down Expand Up @@ -719,7 +826,7 @@ mod tests {
..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
};

let result = RequestSettings::new(&sink_config, context);
let result = try_request_settings(&sink_config, context);
// Should return an error, not panic
assert!(result.is_err());
}
Expand Down Expand Up @@ -779,7 +886,7 @@ mod tests {
..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
};

let result = RequestSettings::new(&sink_config, context);
let result = try_request_settings(&sink_config, context);
// Should return an error, not panic
assert!(result.is_err());
}
Expand Down
Loading