From 44c0e84622fff7c65ca21e286d235b0b8d103546 Mon Sep 17 00:00:00 2001 From: John Sonnenschein Date: Thu, 16 Apr 2026 11:06:14 -0700 Subject: [PATCH 1/6] GCS parquet --- src/sinks/gcp/cloud_storage.rs | 97 +++++++++++++++---- .../components/sinks/gcp_cloud_storage.cue | 34 +++++++ 2 files changed, 114 insertions(+), 17 deletions(-) diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index c63abdb6689f9..07ef5a5823e2e 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -12,11 +12,13 @@ use tower::ServiceBuilder; use uuid::Uuid; use vector_lib::{ TimeZone, - codecs::encoding::Framer, + codecs::{EncoderKind, encoding::Framer}, configurable::configurable_component, event::{EventFinalizers, Finalizable}, request_metadata::RequestMetadata, }; +#[cfg(feature = "codecs-parquet")] +use vector_lib::codecs::{BatchEncoder, encoding::BatchSerializerConfig}; use crate::{ codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, @@ -151,6 +153,16 @@ pub struct GcsSinkConfig { #[serde(flatten)] encoding: EncodingConfigWithFraming, + /// Batch encoding configuration for columnar formats. + /// + /// When set, events are encoded together as a batch in a columnar format (e.g., Parquet) + /// instead of the standard per-event framing-based encoding. The columnar format handles + /// its own internal compression, so the top-level `compression` setting is bypassed. + #[cfg(feature = "codecs-parquet")] + #[configurable(derived)] + #[serde(default)] + pub batch_encoding: Option, + /// Compression configuration. /// /// All compression algorithms use the default compression level unless otherwise specified. @@ -238,6 +250,8 @@ fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig { content_encoding: Default::default(), cache_control: Default::default(), encoding, + #[cfg(feature = "codecs-parquet")] + batch_encoding: None, compression: Compression::gzip_default(), batch: Default::default(), endpoint: Default::default(), @@ -282,6 +296,10 @@ impl SinkConfig for GcsSinkConfig { } fn input(&self) -> Input { + #[cfg(feature = "codecs-parquet")] + if let Some(batch_config) = &self.batch_encoding { + return Input::new(batch_config.input_type() & DataType::Log); + } Input::new(self.encoding.config().1.input_type() & DataType::Log) } @@ -340,7 +358,7 @@ struct RequestSettings { extension: String, time_format: String, append_uuid: bool, - encoder: (Transformer, Encoder), + encoder: (Transformer, EncoderKind), compression: Compression, tz_offset: Option, } @@ -348,7 +366,7 @@ struct RequestSettings { impl RequestBuilder<(String, Vec)> for RequestSettings { type Metadata = (String, EventFinalizers); type Events = Vec; - type Encoder = (Transformer, Encoder); + type Encoder = (Transformer, EncoderKind); type Payload = Bytes; type Request = GcsRequest; type Error = io::Error; @@ -419,20 +437,69 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { impl RequestSettings { fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result { let transformer = config.encoding.transformer(); - let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; - let encoder = Encoder::::new(framer, serializer); + + // Build encoder, compression, extension, and content_type_str together + // so that batch mode (Parquet) can override all four coherently. + #[cfg(feature = "codecs-parquet")] + let (encoder, compression, extension, content_type_str) = + if let Some(batch_config) = &config.batch_encoding { + let batch_serializer = batch_config.build_batch_serializer()?; + let batch_encoder = BatchEncoder::new(batch_serializer); + let ct = config + .content_type + .clone() + .unwrap_or_else(|| batch_encoder.content_type().to_string()); + let ext = config + .filename_extension + .clone() + .or_else(|| match batch_config { + BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()), + #[allow(unreachable_patterns)] + _ => None, + }) + .unwrap_or_else(|| config.compression.extension().into()); + ( + EncoderKind::Batch(batch_encoder), + Compression::None, + ext, + ct, + ) + } else { + let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; + let enc = Encoder::::new(framer, serializer); + let ct = config + .content_type + .clone() + .unwrap_or_else(|| enc.content_type().to_string()); + let ext = config + .filename_extension + .clone() + .unwrap_or_else(|| config.compression.extension().into()); + (EncoderKind::Framed(Box::new(enc)), config.compression, ext, ct) + }; + + #[cfg(not(feature = "codecs-parquet"))] + let (encoder, compression, extension, content_type_str) = { + let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; + let enc = Encoder::::new(framer, serializer); + let ct = config + .content_type + .clone() + .unwrap_or_else(|| enc.content_type().to_string()); + let ext = config + .filename_extension + .clone() + .unwrap_or_else(|| config.compression.extension().into()); + (EncoderKind::Framed(Box::new(enc)), config.compression, ext, ct) + }; + let acl = config .acl .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap()); - let content_type_str = config - .content_type - .as_deref() - .unwrap_or_else(|| encoder.content_type()); - let content_type = HeaderValue::from_str(content_type_str)?; + let content_type = HeaderValue::from_str(&content_type_str)?; let content_encoding = match &config.content_encoding { Some(ce) => Some(HeaderValue::from_str(ce)?), - None => config - .compression + None => compression .content_encoding() .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap()), }; @@ -453,10 +520,6 @@ impl RequestSettings { .collect::, _>>() }) .unwrap_or_else(|| Ok(vec![]))?; - let extension = config - .filename_extension - .clone() - .unwrap_or_else(|| config.compression.extension().into()); let time_format = config.filename_time_format.clone(); let append_uuid = config.filename_append_uuid; let offset = config @@ -474,7 +537,7 @@ impl RequestSettings { extension, time_format, append_uuid, - compression: config.compression, + compression, encoder: (transformer, encoder), tz_offset: offset, }) diff --git a/website/cue/reference/components/sinks/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/gcp_cloud_storage.cue index a17fa3ec59066..64f6fd40c69d8 100644 --- a/website/cue/reference/components/sinks/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/gcp_cloud_storage.cue @@ -170,6 +170,40 @@ components: sinks: gcp_cloud_storage: { data items with the object that are not part of the uploaded data. """ } + + parquet_encoding: { + title: "Parquet Batch Encoding" + body: """ + The GCS sink supports Apache Parquet batch encoding via the `batch_encoding` + option. When configured, events are encoded together as Parquet columnar files + instead of the default per-event encoding. Parquet handles compression + internally, so the top-level `compression` setting must be `"none"`. + Output files automatically use the `.parquet` extension and the + `Content-Type` is set to `application/vnd.apache.parquet`. + + This feature requires the `codecs-parquet` feature flag at compile time. + + ```toml + [sinks.gcs_parquet] + type = "gcp_cloud_storage" + bucket = "my-analytics-bucket" + key_prefix = "logs/date=%F/" + compression = "none" + + [sinks.gcs_parquet.batch_encoding] + codec = "parquet" + compression = "snappy" + + [[sinks.gcs_parquet.batch_encoding.schema]] + name = "message" + type = "utf8" + + [[sinks.gcs_parquet.batch_encoding.schema]] + name = "timestamp" + type = "timestamp_millisecond" + ``` + """ + } } permissions: iam: [ From 1c7370b43c7bd50e83989f5a74f0471c1b451de7 Mon Sep 17 00:00:00 2001 From: John Sonnenschein Date: Thu, 16 Apr 2026 11:26:45 -0700 Subject: [PATCH 2/6] update --- src/sinks/gcp/cloud_storage.rs | 159 ++++++++++++++++++++------------- 1 file changed, 97 insertions(+), 62 deletions(-) diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 07ef5a5823e2e..f3275eeb8a2e0 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -317,21 +317,79 @@ impl GcsSinkConfig { cx: SinkContext, ) -> crate::Result { let request = self.request.into_settings(); - let batch_settings = self.batch.into_batcher_settings()?; - let partitioner = self.key_partitioner()?; - let protocol = get_http_scheme_from_uri(&base_url.parse::().unwrap()); let svc = ServiceBuilder::new() .settings(request, GcsRetryLogic::default()) .service(GcsService::new(client, base_url, auth)); - let request_settings = RequestSettings::new(self, cx)?; + #[cfg(feature = "codecs-parquet")] + if let Some(batch_config) = &self.batch_encoding { + if !matches!(batch_config, BatchSerializerConfig::Parquet(_)) { + return Err( + "batch_encoding only supports encoding with parquet format for gcp_cloud_storage sink" + .into(), + ); + } - let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol); + let batch_serializer = batch_config.build_batch_serializer()?; + let batch_encoder = BatchEncoder::new(batch_serializer); + + let content_type_str = self + .content_type + .clone() + .unwrap_or_else(|| batch_encoder.content_type().to_string()); + + let extension = self + .filename_extension + .clone() + .or_else(|| match batch_config { + BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()), + #[allow(unreachable_patterns)] + _ => None, + }) + .unwrap_or_else(|| self.compression.extension().into()); + + if self.compression != Compression::None { + warn!("Top level compression setting ignored when batch_encoding set to parquet.") + } + let request_settings = RequestSettings::new( + self, + cx, + EncoderKind::Batch(batch_encoder), + Compression::None, + extension, + content_type_str, + )?; + + let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol); + return Ok(VectorSink::from_event_streamsink(sink)); + } + + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + let enc = Encoder::::new(framer, serializer); + let content_type_str = self + .content_type + .clone() + .unwrap_or_else(|| enc.content_type().to_string()); + let extension = self + .filename_extension + .clone() + .unwrap_or_else(|| self.compression.extension().into()); + + let request_settings = RequestSettings::new( + self, + cx, + EncoderKind::Framed(Box::new(enc)), + self.compression, + extension, + content_type_str, + )?; + + let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol); Ok(VectorSink::from_event_streamsink(sink)) } @@ -435,64 +493,16 @@ impl RequestBuilder<(String, Vec)> for RequestSettings { } impl RequestSettings { - fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result { + fn new( + config: &GcsSinkConfig, + cx: SinkContext, + encoder: EncoderKind, + compression: Compression, + extension: String, + content_type_str: String, + ) -> crate::Result { let transformer = config.encoding.transformer(); - // Build encoder, compression, extension, and content_type_str together - // so that batch mode (Parquet) can override all four coherently. - #[cfg(feature = "codecs-parquet")] - let (encoder, compression, extension, content_type_str) = - if let Some(batch_config) = &config.batch_encoding { - let batch_serializer = batch_config.build_batch_serializer()?; - let batch_encoder = BatchEncoder::new(batch_serializer); - let ct = config - .content_type - .clone() - .unwrap_or_else(|| batch_encoder.content_type().to_string()); - let ext = config - .filename_extension - .clone() - .or_else(|| match batch_config { - BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()), - #[allow(unreachable_patterns)] - _ => None, - }) - .unwrap_or_else(|| config.compression.extension().into()); - ( - EncoderKind::Batch(batch_encoder), - Compression::None, - ext, - ct, - ) - } else { - let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; - let enc = Encoder::::new(framer, serializer); - let ct = config - .content_type - .clone() - .unwrap_or_else(|| enc.content_type().to_string()); - let ext = config - .filename_extension - .clone() - .unwrap_or_else(|| config.compression.extension().into()); - (EncoderKind::Framed(Box::new(enc)), config.compression, ext, ct) - }; - - #[cfg(not(feature = "codecs-parquet"))] - let (encoder, compression, extension, content_type_str) = { - let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?; - let enc = Encoder::::new(framer, serializer); - let ct = config - .content_type - .clone() - .unwrap_or_else(|| enc.content_type().to_string()); - let ext = config - .filename_extension - .clone() - .unwrap_or_else(|| config.compression.extension().into()); - (EncoderKind::Framed(Box::new(enc)), config.compression, ext, ct) - }; - let acl = config .acl .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap()); @@ -565,8 +575,12 @@ mod tests { request_metadata::GroupedCountByteSize, }; + use vector_lib::codecs::encoding::Framer; + use vector_lib::codecs::EncoderKind; + use super::*; use crate::{ + codecs::{Encoder, SinkType}, event::LogEvent, test_util::{ components::{SINK_TAGS, run_and_assert_sink_compliance}, @@ -626,7 +640,28 @@ mod tests { } fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings { - RequestSettings::new(sink_config, context).expect("Could not create request settings") + let (framer, serializer) = sink_config + .encoding + .build(SinkType::MessageBased) + .expect("Could not build encoder"); + let enc = Encoder::::new(framer, serializer); + let content_type_str = sink_config + .content_type + .clone() + .unwrap_or_else(|| enc.content_type().to_string()); + let extension = sink_config + .filename_extension + .clone() + .unwrap_or_else(|| sink_config.compression.extension().into()); + RequestSettings::new( + sink_config, + context, + EncoderKind::Framed(Box::new(enc)), + sink_config.compression, + extension, + content_type_str, + ) + .expect("Could not create request settings") } fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest { From 3e182e6861a81a66c08edff03a08dbd98be0b051 Mon Sep 17 00:00:00 2001 From: John Sonnenschein Date: Thu, 16 Apr 2026 12:01:21 -0700 Subject: [PATCH 3/6] fix tests --- src/sinks/gcp/cloud_storage.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index f3275eeb8a2e0..6278471751827 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -639,7 +639,10 @@ mod tests { assert_eq!(key, "key: value"); } - fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings { + fn try_request_settings( + sink_config: &GcsSinkConfig, + context: SinkContext, + ) -> crate::Result { let (framer, serializer) = sink_config .encoding .build(SinkType::MessageBased) @@ -661,7 +664,10 @@ mod tests { extension, content_type_str, ) - .expect("Could not create request settings") + } + + fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings { + try_request_settings(sink_config, context).expect("Could not create request settings") } fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest { @@ -752,10 +758,10 @@ mod tests { let sink_config = GcsSinkConfig { // Invalid header value with newline character content_type: Some("text/plain\nInvalid".to_string()), - ..default_config((None::, TextSerializerConfig::default()).into()) + ..default_config(EncodingConfigWithFraming::from((None::, TextSerializerConfig::default()))) }; - let result = RequestSettings::new(&sink_config, context); + let result = try_request_settings(&sink_config, context); // Should return an error, not panic assert!(result.is_err()); } @@ -817,7 +823,7 @@ mod tests { ..default_config((None::, TextSerializerConfig::default()).into()) }; - let result = RequestSettings::new(&sink_config, context); + let result = try_request_settings(&sink_config, context); // Should return an error, not panic assert!(result.is_err()); } @@ -877,7 +883,7 @@ mod tests { ..default_config((None::, TextSerializerConfig::default()).into()) }; - let result = RequestSettings::new(&sink_config, context); + let result = try_request_settings(&sink_config, context); // Should return an error, not panic assert!(result.is_err()); } From 58d86c8881ec20cddd9f31a46672b0d066760fe1 Mon Sep 17 00:00:00 2001 From: John Sonnenschein Date: Thu, 16 Apr 2026 12:44:01 -0700 Subject: [PATCH 4/6] fmt --- src/sinks/gcp/cloud_storage.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 6278471751827..89cb37d4c31e1 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -10,6 +10,8 @@ use indoc::indoc; use snafu::{ResultExt, Snafu}; use tower::ServiceBuilder; use uuid::Uuid; +#[cfg(feature = "codecs-parquet")] +use vector_lib::codecs::{BatchEncoder, encoding::BatchSerializerConfig}; use vector_lib::{ TimeZone, codecs::{EncoderKind, encoding::Framer}, @@ -17,8 +19,6 @@ use vector_lib::{ event::{EventFinalizers, Finalizable}, request_metadata::RequestMetadata, }; -#[cfg(feature = "codecs-parquet")] -use vector_lib::codecs::{BatchEncoder, encoding::BatchSerializerConfig}; use crate::{ codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, @@ -575,8 +575,8 @@ mod tests { request_metadata::GroupedCountByteSize, }; - use vector_lib::codecs::encoding::Framer; use vector_lib::codecs::EncoderKind; + use vector_lib::codecs::encoding::Framer; use super::*; use crate::{ @@ -758,7 +758,10 @@ mod tests { let sink_config = GcsSinkConfig { // Invalid header value with newline character content_type: Some("text/plain\nInvalid".to_string()), - ..default_config(EncodingConfigWithFraming::from((None::, TextSerializerConfig::default()))) + ..default_config(EncodingConfigWithFraming::from(( + None::, + TextSerializerConfig::default(), + ))) }; let result = try_request_settings(&sink_config, context); From 4d0449903aafbbf219167d735d467b9f63eb4bdf Mon Sep 17 00:00:00 2001 From: John Sonnenschein Date: Thu, 16 Apr 2026 12:46:31 -0700 Subject: [PATCH 5/6] changelog --- changelog.d/25216_gcp_cloud_storage_parquet.feature.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog.d/25216_gcp_cloud_storage_parquet.feature.md diff --git a/changelog.d/25216_gcp_cloud_storage_parquet.feature.md b/changelog.d/25216_gcp_cloud_storage_parquet.feature.md new file mode 100644 index 0000000000000..db34ad56ab471 --- /dev/null +++ b/changelog.d/25216_gcp_cloud_storage_parquet.feature.md @@ -0,0 +1,5 @@ +Add Apache Parquet batch encoding support for the `gcp_cloud_storage` sink with flexible schema definitions. + +Enable the `codecs-parquet` feature and configure `batch_encoding` with `codec = "parquet"` in the gcp_cloud_storage sink configuration. + +authors: sonnens From e42777e897b43b9a478a4395e62ebcafe0618ef3 Mon Sep 17 00:00:00 2001 From: John Sonnenschein Date: Thu, 16 Apr 2026 13:06:15 -0700 Subject: [PATCH 6/6] cue --- .../components/sinks/gcp_cloud_storage.cue | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/website/cue/reference/components/sinks/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/gcp_cloud_storage.cue index 64f6fd40c69d8..a17fa3ec59066 100644 --- a/website/cue/reference/components/sinks/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/gcp_cloud_storage.cue @@ -170,40 +170,6 @@ components: sinks: gcp_cloud_storage: { data items with the object that are not part of the uploaded data. """ } - - parquet_encoding: { - title: "Parquet Batch Encoding" - body: """ - The GCS sink supports Apache Parquet batch encoding via the `batch_encoding` - option. When configured, events are encoded together as Parquet columnar files - instead of the default per-event encoding. Parquet handles compression - internally, so the top-level `compression` setting must be `"none"`. - Output files automatically use the `.parquet` extension and the - `Content-Type` is set to `application/vnd.apache.parquet`. - - This feature requires the `codecs-parquet` feature flag at compile time. - - ```toml - [sinks.gcs_parquet] - type = "gcp_cloud_storage" - bucket = "my-analytics-bucket" - key_prefix = "logs/date=%F/" - compression = "none" - - [sinks.gcs_parquet.batch_encoding] - codec = "parquet" - compression = "snappy" - - [[sinks.gcs_parquet.batch_encoding.schema]] - name = "message" - type = "utf8" - - [[sinks.gcs_parquet.batch_encoding.schema]] - name = "timestamp" - type = "timestamp_millisecond" - ``` - """ - } } permissions: iam: [