diff --git a/changelog.d/25216_gcp_cloud_storage_parquet.feature.md b/changelog.d/25216_gcp_cloud_storage_parquet.feature.md new file mode 100644 index 0000000000000..db34ad56ab471 --- /dev/null +++ b/changelog.d/25216_gcp_cloud_storage_parquet.feature.md @@ -0,0 +1,5 @@ +Add Apache Parquet batch encoding support for the `gcp_cloud_storage` sink with flexible schema definitions. + +Enable the `codecs-parquet` feature and configure `batch_encoding` with `codec = "parquet"` in the gcp_cloud_storage sink configuration. + +authors: sonnens diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index c63abdb6689f9..89cb37d4c31e1 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -10,9 +10,11 @@ use indoc::indoc; use snafu::{ResultExt, Snafu}; use tower::ServiceBuilder; use uuid::Uuid; +#[cfg(feature = "codecs-parquet")] +use vector_lib::codecs::{BatchEncoder, encoding::BatchSerializerConfig}; use vector_lib::{ TimeZone, - codecs::encoding::Framer, + codecs::{EncoderKind, encoding::Framer}, configurable::configurable_component, event::{EventFinalizers, Finalizable}, request_metadata::RequestMetadata, @@ -151,6 +153,16 @@ pub struct GcsSinkConfig { #[serde(flatten)] encoding: EncodingConfigWithFraming, + /// Batch encoding configuration for columnar formats. + /// + /// When set, events are encoded together as a batch in a columnar format (e.g., Parquet) + /// instead of the standard per-event framing-based encoding. The columnar format handles + /// its own internal compression, so the top-level `compression` setting is bypassed. + #[cfg(feature = "codecs-parquet")] + #[configurable(derived)] + #[serde(default)] + pub batch_encoding: Option, + /// Compression configuration. /// /// All compression algorithms use the default compression level unless otherwise specified. @@ -238,6 +250,8 @@ fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig { content_encoding: Default::default(), cache_control: Default::default(), encoding, + #[cfg(feature = "codecs-parquet")] + batch_encoding: None, compression: Compression::gzip_default(), batch: Default::default(), endpoint: Default::default(), @@ -282,6 +296,10 @@ impl SinkConfig for GcsSinkConfig { } fn input(&self) -> Input { + #[cfg(feature = "codecs-parquet")] + if let Some(batch_config) = &self.batch_encoding { + return Input::new(batch_config.input_type() & DataType::Log); + } Input::new(self.encoding.config().1.input_type() & DataType::Log) } @@ -299,21 +317,79 @@ impl GcsSinkConfig { cx: SinkContext, ) -> crate::Result { let request = self.request.into_settings(); - let batch_settings = self.batch.into_batcher_settings()?; - let partitioner = self.key_partitioner()?; - let protocol = get_http_scheme_from_uri(&base_url.parse::().unwrap()); let svc = ServiceBuilder::new() .settings(request, GcsRetryLogic::default()) .service(GcsService::new(client, base_url, auth)); - let request_settings = RequestSettings::new(self, cx)?; + #[cfg(feature = "codecs-parquet")] + if let Some(batch_config) = &self.batch_encoding { + if !matches!(batch_config, BatchSerializerConfig::Parquet(_)) { + return Err( + "batch_encoding only supports encoding with parquet format for gcp_cloud_storage sink" + .into(), + ); + } - let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol); + let batch_serializer = batch_config.build_batch_serializer()?; + let batch_encoder = BatchEncoder::new(batch_serializer); + + let content_type_str = self + .content_type + .clone() + .unwrap_or_else(|| batch_encoder.content_type().to_string()); + + let extension = self + .filename_extension + .clone() + .or_else(|| match batch_config { + BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()), + #[allow(unreachable_patterns)] + _ => None, + }) + .unwrap_or_else(|| self.compression.extension().into()); + + if self.compression != Compression::None { + warn!("Top level compression setting ignored when batch_encoding set to parquet.") + } + + let request_settings = RequestSettings::new( + self, + cx, + EncoderKind::Batch(batch_encoder), + Compression::None, + extension, + content_type_str, + )?; + + let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol); + return Ok(VectorSink::from_event_streamsink(sink)); + } + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + let enc = Encoder::::new(framer, serializer); + let content_type_str = self + .content_type + .clone() + .unwrap_or_else(|| enc.content_type().to_string()); + let extension = self + .filename_extension + .clone() + .unwrap_or_else(|| self.compression.extension().into()); + + let request_settings = RequestSettings::new( + self, + cx, + EncoderKind::Framed(Box::new(enc)), + self.compression, + extension, + content_type_str, + )?; + + let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol); Ok(VectorSink::from_event_streamsink(sink)) } @@ -340,7 +416,7 @@ struct RequestSettings { extension: String, time_format: String, append_uuid: bool, - encoder: (Transformer, Encoder), + encoder: (Transformer, EncoderKind), compression: Compression, tz_offset: Option, } @@ -348,7 +424,7 @@ struct RequestSettings { impl RequestBuilder<(String, Vec)> for RequestSettings { type Metadata = (String, EventFinalizers); type Events = Vec; - type Encoder = (Transformer, Encoder); + type Encoder = (Transformer, EncoderKind); type Payload = Bytes; type Request = GcsRequest; type Error = io::Error; @@ -417,22 +493,23 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { } impl RequestSettings { - fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result { + fn new( + config: &GcsSinkConfig, + cx: SinkContext, + encoder: EncoderKind, + compression: Compression, + extension: String, + content_type_str: String, + ) -> crate::Result { let transformer = config.encoding.transformer(); - let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; - let encoder = Encoder::::new(framer, serializer); + let acl = config .acl .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap()); - let content_type_str = config - .content_type - .as_deref() - .unwrap_or_else(|| encoder.content_type()); - let content_type = HeaderValue::from_str(content_type_str)?; + let content_type = HeaderValue::from_str(&content_type_str)?; let content_encoding = match &config.content_encoding { Some(ce) => Some(HeaderValue::from_str(ce)?), - None => config - .compression + None => compression .content_encoding() .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap()), }; @@ -453,10 +530,6 @@ impl RequestSettings { .collect::, _>>() }) .unwrap_or_else(|| Ok(vec![]))?; - let extension = config - .filename_extension - .clone() - .unwrap_or_else(|| config.compression.extension().into()); let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; let offset = config @@ -474,7 +547,7 @@ impl RequestSettings { extension, time_format, append_uuid, - compression: config.compression, + compression, encoder: (transformer, encoder), tz_offset: offset, }) @@ -502,8 +575,12 @@ mod tests { request_metadata::GroupedCountByteSize, }; + use vector_lib::codecs::EncoderKind; + use vector_lib::codecs::encoding::Framer; + use super::*; use crate::{ + codecs::{Encoder, SinkType}, event::LogEvent, test_util::{ components::{SINK_TAGS, run_and_assert_sink_compliance}, @@ -562,8 +639,35 @@ mod tests { assert_eq!(key, "key: value"); } + fn try_request_settings( + sink_config: &GcsSinkConfig, + context: SinkContext, + ) -> crate::Result { + let (framer, serializer) = sink_config + .encoding + .build(SinkType::MessageBased) + .expect("Could not build encoder"); + let enc = Encoder::::new(framer, serializer); + let content_type_str = sink_config + .content_type + .clone() + .unwrap_or_else(|| enc.content_type().to_string()); + let extension = sink_config + .filename_extension + .clone() + .unwrap_or_else(|| sink_config.compression.extension().into()); + RequestSettings::new( + sink_config, + context, + EncoderKind::Framed(Box::new(enc)), + sink_config.compression, + extension, + content_type_str, + ) + } + fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings { - RequestSettings::new(sink_config, context).expect("Could not create request settings") + try_request_settings(sink_config, context).expect("Could not create request settings") } fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest { @@ -654,10 +758,13 @@ mod tests { let sink_config = GcsSinkConfig { // Invalid header value with newline character content_type: Some("text/plain\nInvalid".to_string()), - ..default_config((None::, TextSerializerConfig::default()).into()) + ..default_config(EncodingConfigWithFraming::from(( + None::, + TextSerializerConfig::default(), + ))) }; - let result = RequestSettings::new(&sink_config, context); + let result = try_request_settings(&sink_config, context); // Should return an error, not panic assert!(result.is_err()); } @@ -719,7 +826,7 @@ mod tests { ..default_config((None::, TextSerializerConfig::default()).into()) }; - let result = RequestSettings::new(&sink_config, context); + let result = try_request_settings(&sink_config, context); // Should return an error, not panic assert!(result.is_err()); } @@ -779,7 +886,7 @@ mod tests { ..default_config((None::, TextSerializerConfig::default()).into()) }; - let result = RequestSettings::new(&sink_config, context); + let result = try_request_settings(&sink_config, context); // Should return an error, not panic assert!(result.is_err()); }