diff --git a/Cargo.lock b/Cargo.lock index d4d9fb0..f87677f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1722,7 +1722,7 @@ dependencies = [ [[package]] name = "etl" version = "0.1.0" -source = "git+https://github.com/supabase/etl?rev=3122cb1c8df9649e9caaa4ee9d89e1611df29baf#3122cb1c8df9649e9caaa4ee9d89e1611df29baf" +source = "git+https://github.com/supabase/etl?rev=b47d296cca6ea7bcb5262b627e5c93ef81f8acc2#b47d296cca6ea7bcb5262b627e5c93ef81f8acc2" dependencies = [ "aws-lc-rs", "byteorder", @@ -1740,9 +1740,11 @@ dependencies = [ "rustls 0.23.36", "serde_json", "sqlx", + "sysinfo", "tokio", "tokio-postgres", "tokio-rustls 0.26.4", + "tokio-stream", "tracing", "uuid", "x509-cert", @@ -1751,7 +1753,7 @@ dependencies = [ [[package]] name = "etl-config" version = "0.1.0" -source = "git+https://github.com/supabase/etl?rev=3122cb1c8df9649e9caaa4ee9d89e1611df29baf#3122cb1c8df9649e9caaa4ee9d89e1611df29baf" +source = "git+https://github.com/supabase/etl?rev=b47d296cca6ea7bcb5262b627e5c93ef81f8acc2#b47d296cca6ea7bcb5262b627e5c93ef81f8acc2" dependencies = [ "config", "secrecy", @@ -1764,7 +1766,7 @@ dependencies = [ [[package]] name = "etl-postgres" version = "0.1.0" -source = "git+https://github.com/supabase/etl?rev=3122cb1c8df9649e9caaa4ee9d89e1611df29baf#3122cb1c8df9649e9caaa4ee9d89e1611df29baf" +source = "git+https://github.com/supabase/etl?rev=b47d296cca6ea7bcb5262b627e5c93ef81f8acc2#b47d296cca6ea7bcb5262b627e5c93ef81f8acc2" dependencies = [ "bytes", "chrono", @@ -3242,6 +3244,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ntapi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3b335231dfd352ffb0f8017f3b6027a4917f7df785ea2143d8af2adc66980ae" +dependencies = [ + "winapi", +] + [[package]] name = "nuid" version = "0.5.0" @@ -3345,6 +3356,25 @@ dependencies = [ "syn", ] +[[package]] +name = "objc2-core-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" +dependencies = [ + "bitflags 2.10.0", +] + +[[package]] +name = "objc2-io-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "oid-registry" version = "0.8.1" @@ -3811,6 +3841,7 @@ dependencies = [ "serde_json", "serde_yaml", "sqlx", + "sysinfo", "temp-env", "tempfile", "testcontainers", @@ -5190,6 +5221,20 @@ dependencies = [ "syn", ] +[[package]] +name = "sysinfo" +version = "0.38.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92ab6a2f8bfe508deb3c6406578252e491d299cbbf3bc0529ecc3313aee4a52f" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -5529,6 +5574,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -6072,6 +6118,27 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -6085,6 +6152,17 @@ dependencies = [ "windows-strings", ] +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -6113,6 +6191,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core", + "windows-link", +] + [[package]] name = "windows-registry" version = "0.6.1" @@ -6226,6 +6314,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index 974df7d..54dd974 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,10 +52,11 @@ tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", defa tokio-rustls = { version = "0.26.2", default-features = false } tracing = { version = "0.1.41", default-features = false } tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter"] } +sysinfo = { version = "0.38.4", default-features = false, features = ["system"] } x509-cert = { version = "0.2.2", default-features = false } -etl = { git = "https://github.com/supabase/etl", rev = "3122cb1c8df9649e9caaa4ee9d89e1611df29baf" } -etl-postgres = { git = "https://github.com/supabase/etl", rev = "3122cb1c8df9649e9caaa4ee9d89e1611df29baf" } +etl = { git = "https://github.com/supabase/etl", rev = "b47d296cca6ea7bcb5262b627e5c93ef81f8acc2" } +etl-postgres = { git = "https://github.com/supabase/etl", rev = "b47d296cca6ea7bcb5262b627e5c93ef81f8acc2" } uuid = { version = "1.19.0", default-features = false, features = ["v4"] } # Optional sink dependencies. diff --git a/README.md b/README.md index 048b2a3..78ce23b 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: kafka diff --git a/docs/getting-started.md b/docs/getting-started.md index 57fb7df..105d9f7 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -31,8 +31,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: webhook diff --git a/docs/index.md b/docs/index.md index 9370e63..f1d8e39 100644 --- a/docs/index.md +++ b/docs/index.md @@ -68,8 +68,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: kafka diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index b67681a..e5c538e 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -21,8 +21,8 @@ stream: interval_secs: 10 retries: 3 batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: kafka @@ -148,25 +148,25 @@ Number of failed probes before the connection is considered dead. Controls how events are grouped before delivery. -#### `max_size` +#### `memory_budget_ratio` | | | |--|--| -| Type | integer | +| Type | float | | Required | No | -| Default | 1000 | +| Default | 0.2 | -Maximum events per batch. Larger batches improve throughput but increase latency. +Fraction of total process memory reserved for batched stream payloads. Higher values allow larger batches but increase memory usage. -#### `max_fill_secs` +#### `max_fill_ms` | | | |--|--| | Type | integer | | Required | No | -| Default | 5 | +| Default | 10000 | -Maximum time to fill a batch in seconds. Lower values reduce latency but may result in smaller batches. +Maximum time to fill a batch in milliseconds. Lower values reduce latency but may result in smaller batches. ## Sink Configuration diff --git a/docs/sinks/elasticsearch.md b/docs/sinks/elasticsearch.md index 4755e31..7c78415 100644 --- a/docs/sinks/elasticsearch.md +++ b/docs/sinks/elasticsearch.md @@ -77,8 +77,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: elasticsearch diff --git a/docs/sinks/gcp-pubsub.md b/docs/sinks/gcp-pubsub.md index c6fc07b..f855eac 100644 --- a/docs/sinks/gcp-pubsub.md +++ b/docs/sinks/gcp-pubsub.md @@ -77,8 +77,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: gcp-pubsub diff --git a/docs/sinks/kafka.md b/docs/sinks/kafka.md index 557bb39..8a89c77 100644 --- a/docs/sinks/kafka.md +++ b/docs/sinks/kafka.md @@ -71,8 +71,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: kafka diff --git a/docs/sinks/kinesis.md b/docs/sinks/kinesis.md index 50c7e4f..92b53b8 100644 --- a/docs/sinks/kinesis.md +++ b/docs/sinks/kinesis.md @@ -87,8 +87,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: kinesis diff --git a/docs/sinks/meilisearch.md b/docs/sinks/meilisearch.md index cb9da7f..c7ea765 100644 --- a/docs/sinks/meilisearch.md +++ b/docs/sinks/meilisearch.md @@ -89,8 +89,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: meilisearch diff --git a/docs/sinks/nats.md b/docs/sinks/nats.md index 0be642a..ea28ea4 100644 --- a/docs/sinks/nats.md +++ b/docs/sinks/nats.md @@ -62,8 +62,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: nats diff --git a/docs/sinks/rabbitmq.md b/docs/sinks/rabbitmq.md index beef78c..6bf6152 100644 --- a/docs/sinks/rabbitmq.md +++ b/docs/sinks/rabbitmq.md @@ -76,8 +76,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: rabbitmq diff --git a/docs/sinks/redis-streams.md b/docs/sinks/redis-streams.md index ca2bde5..e6e9369 100644 --- a/docs/sinks/redis-streams.md +++ b/docs/sinks/redis-streams.md @@ -75,8 +75,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: redis-streams diff --git a/docs/sinks/redis-strings.md b/docs/sinks/redis-strings.md index 58ca694..797f86d 100644 --- a/docs/sinks/redis-strings.md +++ b/docs/sinks/redis-strings.md @@ -73,8 +73,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: redis-strings diff --git a/docs/sinks/sns.md b/docs/sinks/sns.md index 7644cfd..3d1e450 100644 --- a/docs/sinks/sns.md +++ b/docs/sinks/sns.md @@ -83,8 +83,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: sns diff --git a/docs/sinks/sqs.md b/docs/sinks/sqs.md index 920041e..8084013 100644 --- a/docs/sinks/sqs.md +++ b/docs/sinks/sqs.md @@ -83,8 +83,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: sqs diff --git a/docs/sinks/webhook.md b/docs/sinks/webhook.md index 26d9c50..9c3f401 100644 --- a/docs/sinks/webhook.md +++ b/docs/sinks/webhook.md @@ -88,8 +88,8 @@ stream: tls: enabled: false batch: - max_size: 1000 - max_fill_secs: 5 + memory_budget_ratio: 0.2 + max_fill_ms: 5000 sink: type: webhook diff --git a/etl-migrations/20250827000000_base.sql b/etl-migrations/20250827000000_base.sql deleted file mode 100644 index 5a2d525..0000000 --- a/etl-migrations/20250827000000_base.sql +++ /dev/null @@ -1,76 +0,0 @@ --- Base schema for etl-replicator store (applied on the source DB) - --- Ensure etl schema exists (also set by runtime, but safe here) -create schema if not exists etl; - --- Enum for table replication state -create type etl.table_state as enum ( - 'init', - 'data_sync', - 'finished_copy', - 'sync_done', - 'ready', - 'errored' -); - --- Replication state -create table etl.replication_state ( - id bigint generated always as identity primary key, - pipeline_id bigint not null, - table_id oid not null, - state etl.table_state not null, - metadata jsonb, - prev bigint references etl.replication_state(id), - is_current boolean not null default true, - created_at timestamptz not null default now(), - updated_at timestamptz not null default now() -); - --- Ensures that there is only one current state per pipeline/table -create unique index uq_replication_state_current_true - on etl.replication_state (pipeline_id, table_id) - where is_current = true; - --- Table schemas (per pipeline, per table) -create table etl.table_schemas ( - id bigint generated always as identity primary key, - pipeline_id bigint not null, - table_id oid not null, - schema_name text not null, - table_name text not null, - created_at timestamptz not null default now(), - updated_at timestamptz not null default now(), - unique (pipeline_id, table_id) -); - -create index idx_table_schemas_pipeline_table - on etl.table_schemas (pipeline_id, table_id); - --- Columns for stored schemas -create table etl.table_columns ( - id bigint generated always as identity primary key, - table_schema_id bigint not null references etl.table_schemas(id) on delete cascade, - column_name text not null, - column_type text not null, - type_modifier integer not null, - nullable boolean not null, - primary_key boolean not null, - column_order integer not null, - created_at timestamptz not null default now(), - unique (table_schema_id, column_name), - unique (table_schema_id, column_order) -); - -create index idx_table_columns_order - on etl.table_columns (table_schema_id); - --- Source-to-destination table id mappings -create table etl.table_mappings ( - id bigint generated always as identity primary key, - pipeline_id bigint not null, - source_table_id oid not null, - destination_table_id text not null, - created_at timestamptz not null default now(), - updated_at timestamptz not null default now(), - unique (pipeline_id, source_table_id) -); diff --git a/src/concurrency/stream.rs b/src/concurrency/stream.rs index 92b2738..52493a6 100644 --- a/src/concurrency/stream.rs +++ b/src/concurrency/stream.rs @@ -1,26 +1,44 @@ use core::pin::Pin; use core::task::{Context, Poll}; use etl::config::BatchConfig; +use etl::types::SizeHint; use futures::{Future, Stream, ready}; use pin_project_lite::pin_project; use std::time::Duration; +use sysinfo::System; + +const MIN_BATCH_BYTES: usize = 64 * 1024; + +fn replay_batch_budget_bytes(batch_config: &BatchConfig) -> usize { + let mut system = System::new(); + system.refresh_memory(); + + let total_memory_bytes = usize::try_from(system.total_memory()).unwrap_or(usize::MAX); + let budget = ((total_memory_bytes as f64) * f64::from(batch_config.memory_budget_ratio)).round() + as usize; + + budget.max(MIN_BATCH_BYTES) +} // Implementation adapted from: // https://github.com/tokio-rs/tokio/blob/master/tokio-stream/src/stream_ext/chunks_timeout.rs. pin_project! { - /// A stream adapter that batches items based on size limits and timeouts. + /// A stream adapter that batches fallible items based on byte budget and timeouts. /// /// This stream collects items from the underlying stream into batches, emitting them when either: - /// - The batch reaches its maximum size + /// - The accumulated successful items reach the configured memory budget /// - A timeout occurs + /// - An error item is observed #[must_use = "streams do nothing unless polled"] #[derive(Debug)] - pub struct TimeoutBatchStream> { + pub struct TimeoutBatchStream>> { #[pin] stream: S, #[pin] deadline: Option, items: Vec, + current_batch_bytes: usize, + max_batch_bytes: usize, batch_config: BatchConfig, reset_timer: bool, inner_stream_ended: bool, @@ -28,15 +46,23 @@ pin_project! { } } -impl> TimeoutBatchStream { +impl TimeoutBatchStream +where + B: SizeHint, + S: Stream>, +{ /// Creates a new [`TimeoutBatchStream`] with the given configuration. /// /// The stream will batch items according to the provided `batch_config`. pub fn wrap(stream: S, batch_config: BatchConfig) -> Self { + let max_batch_bytes = replay_batch_budget_bytes(&batch_config); + TimeoutBatchStream { stream, deadline: None, - items: Vec::with_capacity(batch_config.max_size), + items: Vec::new(), + current_batch_bytes: 0, + max_batch_bytes, batch_config, reset_timer: true, inner_stream_ended: false, @@ -45,14 +71,19 @@ impl> TimeoutBatchStream { } } -impl> Stream for TimeoutBatchStream { +impl Stream for TimeoutBatchStream +where + B: SizeHint, + S: Stream>, +{ type Item = Vec; /// Polls the stream for the next batch of items using a complex state machine. /// /// This method implements a batching algorithm that balances throughput - /// and latency by collecting items into batches based on both size and time constraints. - /// The polling state machine handles multiple concurrent conditions. + /// and latency by collecting items into batches based on both byte budget + /// and time constraints. The polling state machine handles multiple + /// concurrent conditions. fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut().project(); @@ -77,27 +108,25 @@ impl> Stream for TimeoutBatchStream { *this.reset_timer = false; } - // PRIORITY 2: Memory optimization - // Pre-allocate batch capacity when starting to collect items - // This avoids reallocations during batch collection. - if this.items.is_empty() { - this.items.reserve_exact(this.batch_config.max_size); - } - - // PRIORITY 3: Poll underlying stream for new items + // PRIORITY 2: Poll underlying stream for new items match this.stream.as_mut().poll_next(cx) { Poll::Pending => { // No more items available right now, check if we should emit due to timeout. break; } Poll::Ready(Some(item)) => { - // New item available - add to current batch. + if let Ok(value) = &item { + *this.current_batch_bytes = + this.current_batch_bytes.saturating_add(value.size_hint()); + } + let item_is_err = item.is_err(); this.items.push(item); - // SIZE-BASED EMISSION: If batch is full, emit immediately. - // This provides throughput optimization for high-volume streams. - if this.items.len() >= this.batch_config.max_size { + // BUDGET-BASED EMISSION: If the batch reached its byte budget, + // emit immediately to keep replay batches bounded. + if *this.current_batch_bytes >= *this.max_batch_bytes || item_is_err { *this.reset_timer = true; // Schedule timer reset for next batch. + *this.current_batch_bytes = 0; return Poll::Ready(Some(std::mem::take(this.items))); } // Continue loop to collect more items or check other conditions. @@ -109,6 +138,7 @@ impl> Stream for TimeoutBatchStream { None // No final batch needed. } else { *this.reset_timer = true; // Clean up timer state. + *this.current_batch_bytes = 0; Some(std::mem::take(this.items)) }; @@ -119,16 +149,14 @@ impl> Stream for TimeoutBatchStream { } } - // PRIORITY 4: Time-based emission check - // If we have items and the timeout has expired, emit the current batch - // This provides latency bounds to prevent indefinite delays in low-volume scenarios. + // PRIORITY 3: Time-based emission check + // If we have items and the timeout has expired, emit the current batch. if !this.items.is_empty() && let Some(deadline) = this.deadline.as_pin_mut() { - // Check if timeout has elapsed (this will register waker if not ready). ready!(deadline.poll(cx)); - // Schedule timer reset for next batch. *this.reset_timer = true; + *this.current_batch_bytes = 0; return Poll::Ready(Some(std::mem::take(this.items))); } @@ -217,7 +245,6 @@ impl> Stream for TimeoutStream { // Check if timeout has already expired. let timeout_expired = this .deadline - .as_mut() .as_pin_mut() .map(|deadline| deadline.poll(cx).is_ready()) .unwrap_or(false); diff --git a/src/config/load.rs b/src/config/load.rs index 9327ee3..31edf5b 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -490,7 +490,7 @@ stream: enabled: false trusted_root_certs: "" batch: - max_size: 100 + memory_budget_ratio: 0.25 max_fill_ms: 50 sink: type: memory @@ -513,7 +513,7 @@ sink: "APP_STREAM__PG_CONNECTION__TLS__TRUSTED_ROOT_CERTS", Some(""), ), - ("APP_STREAM__BATCH__MAX_SIZE", Some("200")), + ("APP_STREAM__BATCH__MEMORY_BUDGET_RATIO", Some("0.4")), ("APP_STREAM__BATCH__MAX_FILL_MS", Some("100")), ("APP_SINK__TYPE", Some("memory")), ], @@ -524,7 +524,7 @@ sink: assert_eq!(config.stream.pg_connection.port, 5433); assert_eq!(config.stream.pg_connection.name, "mydb"); assert_eq!(config.stream.pg_connection.username, "user"); - assert_eq!(config.stream.batch.max_size, 200); + assert_eq!(config.stream.batch.memory_budget_ratio, 0.4); assert_eq!(config.stream.batch.max_fill_ms, 100); }, ); diff --git a/src/config/stream.rs b/src/config/stream.rs index ed9c6da..9a26e49 100644 --- a/src/config/stream.rs +++ b/src/config/stream.rs @@ -1,6 +1,6 @@ use etl::config::{ - BatchConfig, InvalidatedSlotBehavior, PgConnectionConfig, PgConnectionConfigWithoutSecrets, - PipelineConfig, TableSyncCopyConfig, + BatchConfig, InvalidatedSlotBehavior, MemoryBackpressureConfig, PgConnectionConfig, + PgConnectionConfigWithoutSecrets, PipelineConfig, TableSyncCopyConfig, }; use serde::{Deserialize, Serialize}; @@ -69,6 +69,9 @@ impl From for PipelineConfig { table_sync_copy: TableSyncCopyConfig::default(), // single table, serial copy is fine max_copy_connections_per_table: 1, + // Intentionally mirror upstream ETL defaults for memory-based backpressure. + memory_refresh_interval_ms: PipelineConfig::DEFAULT_MEMORY_REFRESH_INTERVAL_MS, + memory_backpressure: Some(MemoryBackpressureConfig::default()), // we handle recovery ourselves with checkpoint preservation invalidated_slot_behavior: InvalidatedSlotBehavior::Error, } diff --git a/src/core.rs b/src/core.rs index a2088b4..c1a31cd 100644 --- a/src/core.rs +++ b/src/core.rs @@ -4,7 +4,6 @@ use crate::{ config::{PipelineConfig, SinkConfig}, - migrations::migrate_etl, queries::get_slot_state, sink::{AnySink, memory::MemorySink}, slot_recovery::handle_slot_recovery, @@ -32,9 +31,6 @@ pub async fn start_pipeline_with_config(config: PipelineConfig) -> EtlResult<()> log_config(&config); - // Run etl migrations before starting the pipeline - migrate_etl(&config.stream.pg_connection).await?; - // Recovery loop - restarts the pipeline if slot is invalidated loop { let result = run_pipeline(&config).await; @@ -76,7 +72,8 @@ pub async fn start_pipeline_with_config(config: PipelineConfig) -> EtlResult<()> /// Runs the pipeline once. Returns when the pipeline completes or fails. async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> { // Initialize state store for ETL pipeline state tracking - let state_store = PostgresStore::new(config.stream.id, config.stream.pg_connection.clone()); + let state_store = + PostgresStore::new(config.stream.id, config.stream.pg_connection.clone()).await?; // Create sink based on configuration. let sink = match &config.sink { @@ -271,7 +268,7 @@ fn log_stream_config(config: &PipelineConfig) { dbname = stream.pg_connection.name, username = stream.pg_connection.username, tls_enabled = stream.pg_connection.tls.enabled, - max_batch_size = stream.batch.max_size, + batch_memory_budget_ratio = stream.batch.memory_budget_ratio, max_batch_fill_ms = stream.batch.max_fill_ms, "stream configuration" ); diff --git a/src/migrations.rs b/src/migrations.rs index efb8a4c..32d1aaf 100644 --- a/src/migrations.rs +++ b/src/migrations.rs @@ -8,42 +8,6 @@ use tracing::info; /// Number of database connections to use for the migration pool. const NUM_POOL_CONNECTIONS: u32 = 1; -/// Runs database migrations on the state store. -/// -/// Creates a connection pool to the source database, sets up the `etl` schema, -/// and applies all pending migrations. The migrations are run in the `etl` schema -/// to avoid cluttering the public schema with migration metadata tables created by `sqlx`. -pub async fn migrate_etl(connection_config: &PgConnectionConfig) -> Result<(), sqlx::Error> { - let options: PgConnectOptions = connection_config.with_db(Some(&ETL_MIGRATION_OPTIONS)); - - let pool = PgPoolOptions::new() - .max_connections(NUM_POOL_CONNECTIONS) - .min_connections(NUM_POOL_CONNECTIONS) - .after_connect(|conn, _meta| { - Box::pin(async move { - // Create the etl schema if it doesn't exist - conn.execute("create schema if not exists etl;").await?; - // We set the search_path to etl so that the _sqlx_migrations - // metadata table is created inside that schema instead of the public - // schema - conn.execute("set search_path = 'etl';").await?; - - Ok(()) - }) - }) - .connect_with(options) - .await?; - - info!("applying migrations in the state store before starting replicator"); - - let migrator = sqlx::migrate!("./etl-migrations"); - migrator.run(&pool).await?; - - info!("migrations successfully applied in the state store"); - - Ok(()) -} - /// Runs database migrations on pgstream state store. /// /// Creates a connection pool to the source database, sets up the `pgstream` schema, diff --git a/src/test_utils/database.rs b/src/test_utils/database.rs index cfba764..2fd2e8a 100644 --- a/src/test_utils/database.rs +++ b/src/test_utils/database.rs @@ -5,7 +5,7 @@ use etl::config::{IntoConnectOptions, PgConnectionConfig}; use sqlx::{Connection, Executor, PgConnection, PgPool, postgres::PgPoolOptions}; use tokio::runtime::Handle; -use crate::migrations::{migrate_etl, migrate_pgstream}; +use crate::migrations::migrate_pgstream; use crate::test_utils::test_pg_config; /// Test database wrapper with automatic cleanup on drop. @@ -30,11 +30,6 @@ impl TestDatabase { let config = test_pg_config().await; let pool = create_pg_database(&config).await; - // Run ETL migrations first (required for PostgresStore) - migrate_etl(&config) - .await - .expect("Failed to run ETL migrations"); - // Run pgstream migrations migrate_pgstream(&config) .await diff --git a/src/test_utils/helpers.rs b/src/test_utils/helpers.rs index ab830f2..24e2bef 100644 --- a/src/test_utils/helpers.rs +++ b/src/test_utils/helpers.rs @@ -31,8 +31,8 @@ pub fn test_stream_config_with_id(db: &TestDatabase, id: u64) -> StreamConfig { id, pg_connection: db.config.clone(), batch: etl::config::BatchConfig { - max_size: 100, max_fill_ms: 1000, + memory_budget_ratio: 0.2, }, } } @@ -52,16 +52,15 @@ pub fn make_test_event(table_id: TableId, payload: serde_json::Value) -> Event { Event::Insert(InsertEvent { start_lsn: PgLsn::from(0), commit_lsn: PgLsn::from(0), + tx_ordinal: 0, table_id, - table_row: TableRow { - values: vec![ - Cell::Uuid(Uuid::new_v4()), // id - Cell::Json(payload), // payload - Cell::Null, // metadata - Cell::I64(1), // stream_id - Cell::TimestampTz(timestamp), // created_at - ], - }, + table_row: TableRow::new(vec![ + Cell::Uuid(Uuid::new_v4()), // id + Cell::Json(payload), // payload + Cell::Null, // metadata + Cell::I64(1), // stream_id + Cell::TimestampTz(timestamp), // created_at + ]), }) } @@ -76,16 +75,15 @@ pub fn make_event_with_id( Event::Insert(InsertEvent { start_lsn: PgLsn::from(0), commit_lsn: PgLsn::from(0), + tx_ordinal: 0, table_id, - table_row: TableRow { - values: vec![ - Cell::Uuid(Uuid::parse_str(&id.id).unwrap()), // id - Cell::Json(payload), // payload - Cell::Null, // metadata - Cell::I64(1), // stream_id - Cell::TimestampTz(id.created_at), // created_at - ], - }, + table_row: TableRow::new(vec![ + Cell::Uuid(Uuid::parse_str(&id.id).unwrap()), // id + Cell::Json(payload), // payload + Cell::Null, // metadata + Cell::I64(1), // stream_id + Cell::TimestampTz(id.created_at), // created_at + ]), }) } diff --git a/src/test_utils/schema.rs b/src/test_utils/schema.rs index 507cc6c..df08fee 100644 --- a/src/test_utils/schema.rs +++ b/src/test_utils/schema.rs @@ -58,7 +58,9 @@ pub async fn create_postgres_store( config: &PgConnectionConfig, pool: &PgPool, ) -> PostgresStore { - let store = PostgresStore::new(stream_id, config.clone()); + let store = PostgresStore::new(stream_id, config.clone()) + .await + .expect("Failed to create PostgresStore"); init_events_schema(&store, pool) .await .expect("Failed to initialize events schema"); @@ -74,7 +76,9 @@ pub async fn create_postgres_store_with_table_id( config: &PgConnectionConfig, pool: &PgPool, ) -> (PostgresStore, TableId) { - let store = PostgresStore::new(stream_id, config.clone()); + let store = PostgresStore::new(stream_id, config.clone()) + .await + .expect("Failed to create PostgresStore"); let table_id = init_events_schema(&store, pool) .await .expect("Failed to initialize events schema"); diff --git a/src/types/event.rs b/src/types/event.rs index 88d0112..4b66176 100644 --- a/src/types/event.rs +++ b/src/types/event.rs @@ -87,7 +87,7 @@ pub fn convert_event_from_table( let mut lsn = None; for (idx, col) in column_schemas.iter().enumerate() { - let Some(cell) = table_row.values.get_mut(idx) else { + let Some(cell) = table_row.values_mut().get_mut(idx) else { continue; }; @@ -170,16 +170,14 @@ mod tests { created_at: chrono::DateTime, payload: serde_json::Value, ) -> TableRow { - TableRow { - values: vec![ - Cell::Uuid(id), - Cell::TimestampTz(created_at), - Cell::Json(payload), - Cell::Null, // metadata - Cell::I64(1), // stream_id - Cell::String("0/16B3748".to_string()), // lsn (parsed to PgLsn) - ], - } + TableRow::new(vec![ + Cell::Uuid(id), + Cell::TimestampTz(created_at), + Cell::Json(payload), + Cell::Null, // metadata + Cell::I64(1), // stream_id + Cell::String("0/16B3748".to_string()), // lsn (parsed to PgLsn) + ]) } fn make_table_row_without_lsn( @@ -187,16 +185,14 @@ mod tests { created_at: chrono::DateTime, payload: serde_json::Value, ) -> TableRow { - TableRow { - values: vec![ - Cell::Uuid(id), - Cell::TimestampTz(created_at), - Cell::Json(payload), - Cell::Null, // metadata - Cell::I64(1), // stream_id - Cell::Null, // lsn (null for events before migration) - ], - } + TableRow::new(vec![ + Cell::Uuid(id), + Cell::TimestampTz(created_at), + Cell::Json(payload), + Cell::Null, // metadata + Cell::I64(1), // stream_id + Cell::Null, // lsn (null for events before migration) + ]) } #[test] @@ -236,16 +232,14 @@ mod tests { #[test] fn test_convert_event_from_table_missing_id() { let column_schemas = make_column_schemas(); - let mut table_row = TableRow { - values: vec![ - Cell::Null, // Missing id - Cell::TimestampTz(Utc::now()), - Cell::Json(serde_json::json!({"test": "data"})), - Cell::Null, - Cell::I64(1), - Cell::Null, // lsn - ], - }; + let mut table_row = TableRow::new(vec![ + Cell::Null, // Missing id + Cell::TimestampTz(Utc::now()), + Cell::Json(serde_json::json!({"test": "data"})), + Cell::Null, + Cell::I64(1), + Cell::Null, // lsn + ]); let result = convert_event_from_table(&mut table_row, &column_schemas); @@ -256,16 +250,14 @@ mod tests { #[test] fn test_convert_event_from_table_missing_created_at() { let column_schemas = make_column_schemas(); - let mut table_row = TableRow { - values: vec![ - Cell::Uuid(Uuid::new_v4()), - Cell::Null, // Missing created_at - Cell::Json(serde_json::json!({"test": "data"})), - Cell::Null, - Cell::I64(1), - Cell::Null, // lsn - ], - }; + let mut table_row = TableRow::new(vec![ + Cell::Uuid(Uuid::new_v4()), + Cell::Null, // Missing created_at + Cell::Json(serde_json::json!({"test": "data"})), + Cell::Null, + Cell::I64(1), + Cell::Null, // lsn + ]); let result = convert_event_from_table(&mut table_row, &column_schemas); @@ -281,16 +273,14 @@ mod tests { #[test] fn test_convert_event_from_table_missing_payload() { let column_schemas = make_column_schemas(); - let mut table_row = TableRow { - values: vec![ - Cell::Uuid(Uuid::new_v4()), - Cell::TimestampTz(Utc::now()), - Cell::Null, // Missing payload - Cell::Null, - Cell::I64(1), - Cell::Null, // lsn - ], - }; + let mut table_row = TableRow::new(vec![ + Cell::Uuid(Uuid::new_v4()), + Cell::TimestampTz(Utc::now()), + Cell::Null, // Missing payload + Cell::Null, + Cell::I64(1), + Cell::Null, // lsn + ]); let result = convert_event_from_table(&mut table_row, &column_schemas); @@ -339,6 +329,7 @@ mod tests { Event::Insert(InsertEvent { start_lsn: PgLsn::from(0), commit_lsn: PgLsn::from(0), + tx_ordinal: 0, table_id: TableId::new(1), table_row: make_table_row(id, ts, serde_json::json!({"test": 1})), }), @@ -373,17 +364,16 @@ mod tests { let events = vec![Event::Insert(InsertEvent { start_lsn: PgLsn::from(0), commit_lsn: PgLsn::from(0), + tx_ordinal: 0, table_id: TableId::new(1), - table_row: TableRow { - values: vec![ - Cell::Null, // Missing id - Cell::TimestampTz(Utc::now()), - Cell::Json(serde_json::json!({"test": 1})), - Cell::Null, - Cell::I64(1), - Cell::Null, // lsn - ], - }, + table_row: TableRow::new(vec![ + Cell::Null, // Missing id + Cell::TimestampTz(Utc::now()), + Cell::Json(serde_json::json!({"test": 1})), + Cell::Null, + Cell::I64(1), + Cell::Null, // lsn + ]), })]; let result = convert_stream_events_from_events(events, &column_schemas); diff --git a/tests/slot_recovery_tests.rs b/tests/slot_recovery_tests.rs index 2e385a4..8aec340 100644 --- a/tests/slot_recovery_tests.rs +++ b/tests/slot_recovery_tests.rs @@ -10,7 +10,6 @@ use std::time::Duration; use etl::store::both::postgres::PostgresStore; -use postgres_stream::migrations::migrate_etl; use postgres_stream::queries::get_slot_state; use postgres_stream::sink::memory::MemorySink; use postgres_stream::slot_recovery::handle_slot_recovery; @@ -32,14 +31,11 @@ async fn test_start_with_inactive_invalidated_slot_triggers_recovery() { // The slot name follows the etl crate's naming convention let slot_name = format!("supabase_etl_apply_{pipeline_id}"); - // Step 1: Run ETL migrations - migrate_etl(&db.config) - .await - .expect("Failed to run ETL migrations"); - - // Step 2: Create and start a pipeline to establish the replication slot + // Step 1: Create and start a pipeline to establish the replication slot { - let state_store = PostgresStore::new(pipeline_id, db.config.clone()); + let state_store = PostgresStore::new(pipeline_id, db.config.clone()) + .await + .expect("Failed to create PostgresStore"); let sink = MemorySink::new(); let pgstream = PgStream::create(stream_config.clone(), sink, state_store.clone()) .await @@ -168,7 +164,9 @@ async fn test_start_with_inactive_invalidated_slot_triggers_recovery() { ); // Step 5: Try to restart the pipeline with the invalidated slot - let state_store = PostgresStore::new(pipeline_id, db.config.clone()); + let state_store = PostgresStore::new(pipeline_id, db.config.clone()) + .await + .expect("Failed to create PostgresStore"); let sink = MemorySink::new(); let pgstream = PgStream::create(stream_config.clone(), sink, state_store.clone()) .await @@ -259,11 +257,6 @@ async fn test_pipeline_recovers_from_invalidated_slot() { let pipeline_id = stream_config.id; let slot_name = format!("supabase_etl_apply_{pipeline_id}"); - // Run migrations - migrate_etl(&db.config) - .await - .expect("Failed to run ETL migrations"); - // Create a subscription so we generate events db.ensure_today_partition().await; @@ -272,7 +265,9 @@ async fn test_pipeline_recovers_from_invalidated_slot() { // Step 1: Start pipeline, wait for replication to be ready, then insert test events { - let state_store = PostgresStore::new(pipeline_id, db.config.clone()); + let state_store = PostgresStore::new(pipeline_id, db.config.clone()) + .await + .expect("Failed to create PostgresStore"); let pgstream = PgStream::create(stream_config.clone(), sink.clone(), state_store.clone()) .await .expect("Failed to create PgStream"); @@ -522,7 +517,9 @@ async fn test_pipeline_recovers_from_invalidated_slot() { // Step 6: Restart pipeline - ETL will create a new slot and run DataSync (which we skip) // When replication events arrive, handle_failover() will COPY missed events and clear checkpoint { - let state_store = PostgresStore::new(pipeline_id, db.config.clone()); + let state_store = PostgresStore::new(pipeline_id, db.config.clone()) + .await + .expect("Failed to create PostgresStore"); let pgstream = PgStream::create(stream_config.clone(), sink.clone(), state_store.clone()) .await .expect("Failed to create PgStream"); @@ -674,10 +671,6 @@ async fn test_slot_recovery_preserves_existing_failover_checkpoint() { let pipeline_id = stream_config.id; let slot_name = format!("supabase_etl_apply_{pipeline_id}"); - migrate_etl(&db.config) - .await - .expect("Failed to run ETL migrations"); - db.ensure_today_partition().await; // Create an existing failover checkpoint before the slot is created. @@ -695,7 +688,9 @@ async fn test_slot_recovery_preserves_existing_failover_checkpoint() { // Start and stop pipeline once to create the replication slot. { - let state_store = PostgresStore::new(pipeline_id, db.config.clone()); + let state_store = PostgresStore::new(pipeline_id, db.config.clone()) + .await + .expect("Failed to create PostgresStore"); let sink = MemorySink::new(); let pgstream = PgStream::create(stream_config.clone(), sink, state_store.clone()) .await diff --git a/tests/store_tests.rs b/tests/store_tests.rs index 37f00cd..ef53616 100644 --- a/tests/store_tests.rs +++ b/tests/store_tests.rs @@ -229,8 +229,8 @@ async fn test_store_multiple_streams_isolated() { id: 1, pg_connection: db.config.clone(), batch: etl::config::BatchConfig { - max_size: 100, max_fill_ms: 1000, + memory_budget_ratio: 0.2, }, }; @@ -238,8 +238,8 @@ async fn test_store_multiple_streams_isolated() { id: 2, pg_connection: db.config.clone(), batch: etl::config::BatchConfig { - max_size: 100, max_fill_ms: 1000, + memory_budget_ratio: 0.2, }, }; diff --git a/tests/stream_tests.rs b/tests/stream_tests.rs index d370ecd..39800d3 100644 --- a/tests/stream_tests.rs +++ b/tests/stream_tests.rs @@ -436,7 +436,7 @@ async fn test_failover_recovery_does_not_hang_when_replay_exceeds_batch_size() { .await .expect("Failed to create PgStream"); - // 250 events ensure replay window is larger than batch.max_size (100). + // 250 events ensure replay spans multiple replay batches under the configured memory budget. let event_ids = insert_events_to_db(&db, 250).await; // Event 0 succeeds.