Add Variant SELECT rewriter and comprehensive architecture docs#16
Conversation
- Consolidate WALRUS_DATA_DIR and FOYER_CACHE_DIR into single TIMEFUSION_DATA_DIR with derived subdirs (wal/, cache/) - Add VariantConversionExec to convert string columns to Variant during INSERT - Add VariantInsertRewriter analyzer rule to rewrite string literals for Variant columns - Add is_schema_compatible_for_insert() for flexible INSERT type checking - Split optimizers.rs into optimizers/ module directory - Improve query sanitization for INSERT and long queries
- Add VariantSelectRewriter analyzer rule to wrap Variant columns with variant_to_json() in SELECT projections for PostgreSQL wire protocol - Add comprehensive documentation: - docs/ARCHITECTURE.md: Full system architecture overview - docs/VARIANT_TYPE_SYSTEM.md: Variant type implementation details - docs/WAL.md: Write-ahead log implementation and recovery - Update database.rs with unified table storage model improvements - Update DML operations with buffered layer integration - Align otel_logs_and_spans schema with monoscope - Fix test configurations for new architecture
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsSummaryThis is a well-structured PR that adds critical functionality for Variant type handling and significantly improves documentation. The implementation is thoughtful and addresses PostgreSQL wire protocol compatibility correctly. ✅ StrengthsCode Quality
Best Practices
🔍 Issues & RecommendationsSecurity ConcernsHIGH: Credentials in Database Schema (database.rs:570-571) s3_access_key_id VARCHAR(500) NOT NULL,
s3_secret_access_key VARCHAR(500) NOT NULL,Storing credentials in plaintext is a security vulnerability. Consider:
MEDIUM: Potential SQL Injection in Dynamic Queries Code Quality Issues1. Missing Error Context (variant_insert_rewriter.rs:32) plan.transform_up(|node| rewrite_insert_node(node)).map(|t| t.data)Consider adding error context to help with debugging: plan.transform_up(|node| rewrite_insert_node(node))
.map(|t| t.data)
.map_err(|e| DataFusionError::Plan(format!("VariantInsertRewriter failed: {}", e)))2. Recursive Transformation Could Stack Overflow (variant_insert_rewriter.rs:94-104)
3. Magic Number (optimizers/mod.rs:29) let date_scalar = ScalarValue::Date32(Some(date.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp() as i32 / 86400));The division by 86400 (seconds per day) should be a named constant: const SECONDS_PER_DAY: i32 = 86400;4. Unused Code Warning (optimizers/mod.rs:7-10) // Remove unused imports warning - these are used by the submodules indirectly
use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
use datafusion::scalar::ScalarValue;These imports ARE used directly in the time_range_partition_pruner module below. The comment is misleading - just remove it. Performance Considerations1. Cloning in Hot Path (variant_select_rewriter.rs:40) expr.clone()While necessary for the TreeNode pattern, consider documenting that this is acceptable because:
2. Batch Conversion Efficiency (database.rs:100-152) Test CoverageGOOD:
NEEDS IMPROVEMENT:
Recommendation: Add unit tests in #[cfg(test)]
mod tests {
use super::*;
use datafusion::prelude::*;
#[test]
fn test_variant_select_no_double_wrap() {
// Test that already-wrapped columns aren't wrapped again
}
#[test]
fn test_variant_insert_with_partial_columns() {
// Test INSERT with fewer columns than table schema
}
}DocumentationMinor Issues:
🎯 Action ItemsBefore Merge (Required):
Post-Merge (Recommended):
📊 Metrics
ConclusionThis is high-quality work that significantly improves the Variant type system and documentation. The security concern around credential storage should be addressed before merging, but otherwise the implementation is solid. The new documentation is comprehensive and will help onboard contributors. Recommendation: Approve with requested changes to credential storage Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com |
variant_columns_to_json() was using RecordBatch::try_new() which fails when creating batches with 0 columns (empty projections used by COUNT(*)) because Arrow requires either columns or an explicit row count. Changed to try_new_with_options() to preserve the original batch's row count, fixing queries like SELECT COUNT(*) that don't need any columns.
Transient network errors like "error sending request" were failing immediately with no retries. Added: - RetryConfig: 5 retries with exponential backoff (100ms-15s) - ClientOptions: 30s connect timeout, 5min request timeout This should resolve intermittent flush failures to R2/S3.
- Replace blocking std::thread::sleep with tokio::time::sleep in CAS retry loop to avoid starving the Tokio executor under contention - Fix DML memory tracking: recalculate bucket memory_bytes after DELETE/UPDATE operations to prevent premature flush triggers - Improve WAL recovery resilience: catch schema-incompatible entries instead of aborting recovery, add empty batch skip - Add timestamp range filtering to MemBuffer queries: extract bounds from filter expressions and skip non-overlapping time buckets - Switch from GreedyMemoryPool to FairSpillPool for per-query memory fairness and automatic spill-to-disk under pressure - Make WAL fsync interval configurable via TIMEFUSION_WAL_FSYNC_MS env var (default 200ms)
… optimization - WAL serialization now uses Arrow IPC (v129) instead of custom CompactBatch, with automatic fallback to legacy v128 format for existing WAL entries - MemBuffer compacts multiple small batches into a single RecordBatch before flush to reduce small file writes - Optimization window, min file threshold, and light optimize target size are now configurable via maintenance config
- Switch TimeBucket from RwLock to parking_lot::Mutex for lower overhead - Add compact-on-read in query()/query_partitioned(): first read compacts batches in-place, subsequent reads get pre-compacted single batch - Remove insert-time compaction that caused +64% batch_api write regression - Revert WAL serialization from Arrow IPC back to bincode CompactBatch (IPC schema preamble overhead caused +18-20% SQL insert regression) - Keep IPC deserialization as fallback for backward compatibility - Skip VariantToJsonExec wrapper for tables with no Variant columns - Add bloom filter config (timefusion_bloom_filter_disabled), enabled by default - Add WAL file monitoring and emergency flush on file count threshold - Add criterion benchmarks for write, read, S3 flush, and S3 read paths
Code Review: PR #16 - Variant SELECT/INSERT Rewriter and Architecture DocsOverviewThis is a well-architected and production-ready PR that adds comprehensive Variant type support, extensive documentation, and performance optimizations. The implementation is solid with clean separation of concerns and proper integration into the DataFusion query pipeline. Overall Assessment: ✅ Approve with minor recommendations 🎯 Key Strengths1. Clean Implementation
2. Comprehensive Documentation
3. Performance Focus
4. Type Safety & Error Handling
|
Code Review - PR #16: Variant SELECT Rewriter and Architecture DocumentationOverall AssessmentApproval Status: ✅ Approve with minor suggestions This is a high-quality PR with well-designed components. The VariantSelectRewriter and VariantInsertRewriter are clean, focused implementations that integrate well with DataFusion's analyzer pipeline. Component Reviews1. VariantSelectRewriter (
|
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsOverall this is a substantial, well-structured PR with meaningful improvements across the board. The unified storage model, session-aware DML, and timestamp-based bucket pruning are all solid additions. I've flagged several issues worth addressing before merging. Bugs / Correctness Issues1. In 2. // tests/buffer_consistency_test.rs
unsafe { std::env::set_var("WALRUS_DATA_DIR", &cfg.core.timefusion_data_dir) };
3. WAL deserialization:
4. // src/wal.rs
return Err(WalError::UnsupportedVersion {
version: data[4],
expected: WAL_VERSION_IPC, // reports 129, but 128 is also valid
});Both 128 and 129 are now valid. Reporting 5. // src/pgwire_handlers.rs
format!("{}...", &query[..MAX_LEN])Slicing a Performance Concerns6. In both 7. if let Some(ref pool) = self.config_pool
&& let Ok(new_configs) = Self::load_storage_configs(pool).await
{
let mut configs = self.storage_configs.write().await;
*configs = new_configs;
}This issues a DB query and takes a write lock on every 8.
9. Bloom filters enabled globally with fixed NDV .set_bloom_filter_enabled(!self.config.parquet.timefusion_bloom_filter_disabled)
.set_bloom_filter_ndv(100_000)Applying bloom filters with a fixed NDV of 100k to every column — including high-cardinality columns (UUIDs, full-text fields) and array/struct columns — will significantly increase Parquet file sizes without benefit for those column types. Consider restricting to known low-to-medium cardinality key columns. Design / Architecture Concerns10. Dual Variant→JSON conversion paths may interact unexpectedly The PR introduces two separate mechanisms for converting Variant to JSON on SELECT:
If both are active on the same query, a Variant column could be double-converted, potentially causing a runtime panic (the physical exec expects a 11. fn delta_session_from(session: &SessionState) -> Arc<dyn Session> {
Arc::new(
SessionStateBuilder::new()
.with_config(session.config().clone())
.with_runtime_env(session.runtime_env().clone())
.with_default_features()
.with_query_planner(deltalake::delta_datafusion::planner::DeltaPlanner::new())
.build(),
)
}This builds a fresh session without the registered UDFs (JSON functions, Variant UDFs), catalog providers, or other extensions. If delta-rs internally plans the predicate using those functions, this will fail silently or produce incorrect DELETE/UPDATE results. Consider building this session once at startup, or finding a way to extend the existing session state with only the planner swapped. 12. The filter pruning only recognizes flat Minor / Nits
DocumentationThe three architecture docs (
|
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsOverall this is a solid PR with well-motivated changes — the unified storage model is a meaningful architectural improvement and the CriticalIncomplete analyzer rules list breaks
let analyzer_rules = vec![
ResolveGroupingFunction,
VariantInsertRewriter,
TypeCoercion,
VariantSelectRewriter,
];DataFusion's default set also includes
This directly explains the test regression in // Verify we can query specific columns (SELECT * fails due to Variant column encoding)The failure is not a Variant encoding issue — it is the missing wildcard expansion rule. The fix is to insert the custom rules into the default list rather than replacing it entirely, e.g. by building around BugsVersion tracking for unified tables is always stale In versions.get(&("".to_string(), table_name.to_string())).cloned()But when data is written, the version is recorded under the actual project ID (e.g. Wrong path set in
unsafe { std::env::set_var("WALRUS_DATA_DIR", &cfg.core.timefusion_data_dir) };But WAL unsupported-version error message is misleading
Err(WalError::UnsupportedVersion {
version: data[4],
expected: WAL_VERSION_IPC, // always says "expected 129"
})If a truly unknown version appears, the message incorrectly implies v129 was the only valid option. Consider a message string like Design ConcernsIn-place compaction during reads
let mut batches = bucket.batches.lock();
if batches.len() > 1 {
if let Ok(single) = concat_batches(...) {
batches.clear();
batches.push(single);
}
}Read operations mutating shared state is surprising. A failed compaction (silently ignored via Hardcoded timestamp column name in filter pushdown
let is_ts = matches!(left.as_ref(), Expr::Column(c) if c.name == "timestamp");This silently skips pruning for tables where the timestamp column has a different name, and won't match qualified column references like Performance Notes8x batch size increase
Bloom filters enabled by default Bloom filters add write overhead and increase Parquet file size. The Minor
|
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsOverall this is a well-structured PR that closes an important gap — Variant columns now serialize correctly through the PostgreSQL wire protocol in both INSERT and SELECT paths. The documentation additions are thorough and the config consolidation is clean. A few areas worth addressing before merge are noted below. Potential Bugs1. In This means parameterized INSERTs like 2. In 3. Potential byte-index mismatch in In let lower = query.to_lowercase();
let table_end = lower.find('(').or_else(|| lower.find("values")).unwrap_or(lower.len());
let table_part = query[..table_end].trim_end();
Code Quality & Maintainability4. Dual conversion paths for SELECT may cause confusion The PR adds This is defensible as belt-and-suspenders, but a comment explaining the intentional layering would prevent future maintainers from removing one layer thinking it is dead code. 5. Duplicate type compatibility logic
6. The Performance7. New In Correctness8. Schema partition change in Adding
Is there a migration strategy for existing deployments? If this targets greenfield deployments only, that should be documented clearly. 9.
Test Coverage10. No unit tests for
The integration test delta (+10/-5 lines) is minimal for a feature touching the full INSERT/SELECT round-trip. A test exercising Minor
|
Upgrade dependency stack to use delta-rs's WIP variant-type PR (#4325) and
all compatible crate versions:
- deltalake → fork of abhiaagarwal/delta-rs@abhi/variant-type with two
timefusion-specific patches (tonyalaribe/delta-rs-timefusion@timefusion-fixes):
* default schema_force_view_types to false (variant Binary subfields
otherwise become BinaryView during scan, failing kernel write
validation against unshredded_variant())
- delta_kernel → buoyant_kernel 0.22 (the kernel the PR depends on)
- arrow / arrow-* / parquet / parquet-variant* → 58
- datafusion / datafusion-* → 53.1.0
- object_store → 0.13.2 (trait split: convenience methods moved to
ObjectStoreExt; _opts variants now required)
- datafusion-postgres → 0.16, datafusion-functions-json → 0.53,
datafusion-tracing → 53.0.1, serde_arrow → 0.14 (arrow-58),
datafusion-variant → upstream contrib main
- rust toolchain → 1.91
Code changes driven by the upgrade:
- ObjectStore trait surface (object_store_cache.rs): merge cache logic
into put_opts/get_opts/put_multipart_opts; add delete_stream + copy_opts;
move internal calls to ObjectStoreExt
- ExecutionPlan::properties now returns &Arc<PlanProperties>; wrap fields
accordingly in DmlExec, VariantToJsonExec, VariantConversionExec
- DeltaTable::version() returns Result<u64>; bump last_written_versions
map value type and statistics cache to u64
- OptimizeBuilder::with_target_size takes NonZero<u64>
- WriterPropertiesBuilder::set_max_row_group_size deprecated → use
set_max_row_group_row_count(Some(_))
- parquet 58 variant fields use Binary (not BinaryView) for metadata/value
to match delta_kernel's unshredded_variant() layout; cast
VariantArrayBuilder output accordingly
- Set delta.dataSkippingNumIndexedCols=-1 on table creation so kernel can
evaluate predicates on columns past the default 32-leaf-column stats
cutoff (we have 90 fields). Without this, IS NOT NULL / equality
pushdown on columns like resource___service___name failed with
"No such field" when combined with another predicate
- Drop v2 checkpointPolicy: combined with variant feature it currently
fails buoyant_kernel's protocol validation (Reader/Writer feature
asymmetry); falls back to v1 checkpoints
- VariantInsertRewriter no longer recurses into child plans (indices
aligned to dml.input only); only wraps non-null Utf8 literals
- VariantSelectRewriter skips LogicalPlan::Dml so it doesn't wrap INSERT
projections in variant_to_json
- Tag Variant fields with the canonical arrow.parquet.variant extension
type so datafusion-variant UDFs recognize them at runtime
- Present Variant columns as Utf8View on TableProvider::schema() for the
SQL planner (datafusion's LogicalPlanBuilder::values rejects Utf8→Struct
cast and exposes no hook to register one); DataSink::write_all converts
Utf8 columns back to Variant structs before the Delta write
sqllogictest harness:
- Decode binary NUMERIC via a custom PgNumeric/FromSql wrapper (UInt64
from array_length/json_length is now NUMERIC; tokio-postgres has no
built-in decoder without with-rust_decimal-1)
- Map "numeric" to DefaultColumnType::Integer so `query I` accepts
UInt64-backed counts
Test results: 92/92 pass across lib, integration, sqllogictest (all 11
SLT files), connection_pressure, cache_performance, delta_checkpoint_cache,
dml_operations, postgres_json_functions, custom_functions, statistics,
grpc_ingest, buffer_consistency, delta_rs_api.
… on open-ended MemBuffer queries
Code Review — PR #16: Variant SELECT Rewriter & Architecture DocsThis is a substantial, well-motivated PR. The core ideas (VariantSelectRewriter, WAL Arrow IPC migration, k-way merge recovery, unified table storage) are architecturally sound. I've organised feedback by severity. 🔴 Breaking Changes — Needs Documentation / Migration PathWAL version bump (128 → 130) with no migration path The comment in
WAL sharding changes existing topic keys Existing WAL topics were written to Environment variable rename
Default data directory changed from absolute to relative path // before
const_default!(d_wal_dir: PathBuf = "/var/lib/timefusion/wal");
// after
const_default!(d_data_dir: PathBuf = "./data");
🟠 SecuritygRPC auth uses non-constant-time string comparison ( Some(t) if t == expected => Ok(()),
🟡 Correctness / Behaviour
The integration test was changed from: // SELECT * from otel_logs_and_spans → 89 columnsto: // Verify we can query specific columns (SELECT * fails due to Variant column encoding)
let row = client.query_one("SELECT id, name, status_code, level ...", ...)This weakens regression coverage and leaves a known broken case. If
SELECT payload FROM t WHERE payload->>'key' = 'val'…which DataFusion might plan as
let mut delta_table = table_lock.write().await;
delta_table.update_state().await...
let (new_table, rows_affected) = operation(delta_table.clone()).await?;The write lock is held across an async network call ( 🟡 Performance
pub fn is_table_indexed(&self, table: &str) -> bool {
self.enabled() && self.indexed_tables().iter().any(|t| t == table)
}
WAL shard counter DashMap entries never cleaned up
🟢 Positive Changes Worth Calling Out
Minor
|
…compress Hot writes stay at zstd=3 for ingest latency. A daily cron rewrites partitions >=7d at zstd=9 (cool) and >=30d at zstd=19 (cold), using Z-order when the schema declares z_order_columns and Compact otherwise. Skip-already-upgraded is enforced via a Parquet footer KV (timefusion.compression_tier) probed once per partition. Per-column bloom filters are opt-in via schema YAML (bloom_filter: true), sized to ~1M-row row groups (fpp=0.01, ~1.7MB/col) instead of the legacy global 100k that produced near-1.0 false-positive rates at scale. High-entropy free-text columns get dictionary: false opt-out to skip the wasted 8MB-and-fall-back-to-PLAIN writer pass. Three production bugs fixed along the way: - Global set_bloom_filter_fpp() re-enables blooms via side-effect in parquet-rs even after set_bloom_filter_enabled(false), and uses the default NDV (~1M), causing massive bloom allocations and write hangs. Removed the global call; per-column blooms set their own fpp. - table.table_url() appends ?endpoint=... on non-AWS backends (MinIO) but get_file_uris() returns clean URIs; prefix matching failed silently and the recompress probe always returned None, defeating the skip optimization. Strip query string before matching. - ParquetObjectReader was being constructed with head()'s meta.location (bucket-relative) instead of the object-store-relative path, causing double-prefixing and a 404. Pass the original path. Tests: - 8 unit tests for build_writer_properties (tier/encoding/bloom/dict). - 1 integration test for recompress_partition exercising first rewrite, idempotent rerun, and downgrade skip. Also removes VariantToJsonExec, which was already disconnected from the active scan path (wrap_result = identity) and retained only as a speculative fallback. Git history is the real fallback.
- autotune: derive memory/disk/parallelism from total RAM, free disk,
and CPU count when the corresponding env var is unset. User overrides
always win. Applied in init_config() before the OnceLock seals.
- metrics: OTel meter provider on the same OTLP endpoint as traces.
Observable gauges (oldest_bucket_age_seconds, pressure_pct, mem bytes,
rows, wal disk/files) poll snapshot_stats() each export cycle. Hot-path
counters for inserts, ingest errors, WAL corruption, flush success/fail,
and query executions.
- WAL corruption: warn -> error at every deserialize/replay site; failing
entries are quarantined to {wal_dir}/quarantine/ with .bin + .meta
sidecars for post-mortem. Threshold-based hard bail unchanged.
- Schema-driven time column: TableSchema gains time_column (defaults to
"timestamp"); timestamp_to_date_filter takes the column name so date
partition pruning works for schemas using non-standard names.
- stats_table: surface oldest_bucket_age_secs row.
- Drop TIMEFUSION_TANTIVY_ENABLED. Indexing is now always-on for any table whose YAML schema declares `tantivy.indexed: true` on at least one column. The optional `TIMEFUSION_TANTIVY_INDEXED_TABLES` env list is now an additive override for dynamic tables not in the static registry. - TantivyPredicateRewriter: new AnalyzerRule that detects `col = 'literal'` and `col LIKE 'pattern'` predicates on indexed columns and additively AND-s a `text_match(col, q)` call. Supports exact equality and trailing-wildcard LIKE (`'prefix%'`). Conservative on tantivy QueryParser metachars — bails to original predicate. Correctness preserved: the original `=` / `LIKE` is never removed, so MemBuffer rows and freshly-flushed-not-yet-indexed Delta files still evaluate it directly. - Bounded prefilter: search service accepts a max_hits cap; routing layer also skips the IN-list pushdown when selectivity exceeds the configured threshold (default 50%) — IN(N) above ~100k is the bottleneck, not S3. - OTel metrics: `tantivy.index_lag_seconds` gauge (now - newest published max_timestamp), plus counters for prefilter_attempts, prefilter_used, prefilter_skipped, prefilter_errors. - Tests: 10 analyzer-level tests in tests/tantivy_transparent_test.rs covering rewrite correctness, idempotency, non-indexed-column skip, unsupported LIKE patterns, metachar bail, and indexed-table auto-discovery.
- Add `ngram3` tokenizer (3-gram + lowercase + ASCII-fold + 256-char cap). Registered on both index creation and reader open since tantivy's TokenizerManager is per-Index and not persisted. - Make `ngram3` the default tokenizer when YAML omits `tokenizer:`. Most log/trace text queries are `LIKE '%substr%'` — substring search needs to be the fast path, not a special opt-in. Net index cost on English ASCII: typically 1.5-2x vs word tokenizer (trigram dict is bounded by ~10k entries). Enums opt down to `tokenizer: raw`. - Extend TantivyPredicateRewriter to handle: - `LIKE '%suffix'` on ngram3 columns (term match) - `LIKE '%infix%'` on ngram3 columns (term match) - `ILIKE` on ngram3 and default columns (both lowercase the literal) - Production YAML: switch name/status_message/body/attributes/summary to ngram3; keep level/kind/status_code as raw (enums). - QueryParser now uses conjunction-by-default: multi-trigram queries AND their parts (any-of would be a correctness bug — single matching trigram doesn't imply substring presence). - Bump SCHEMA_VERSION 1→2: old indexes can't be queried with the new analyzer chain. Search skips them; they're replaced on next flush. - Drop TIMEFUSION_TANTIVY_INDEXED_TABLES env override — schema is the single source of truth. No knobs nobody asked for. - Update e2e tests to use natural SQL (`WHERE level = 'ERROR'`, `WHERE name LIKE '%...%'`) instead of explicit text_match() calls, so they actually exercise the rewriter path. - 105/105 tests pass: lib 74, transparent 16, search 5, storage 4, index 6.
Closes the ~10 minute window where freshly-inserted rows lived in MemBuffer but couldn't use the tantivy prefilter. Per-bucket tantivy indexes now materialize JIT on first text_match query and are queried alongside the Delta sidecar indexes; results are unioned into a single `id IN (..)` filter applied to both Delta and MemBuffer scans. Lifecycle: - Built lazily on first query (no insert-time CPU cost when no queries). - Cached on TimeBucket until row_count grows past indexed_rows; next query rebuilds. Insert sets the cache to None directly so even bounded-bucket workloads with frequent ingest rebuild correctly. - Dropped automatically when the bucket drains or is evicted (cache lives on the bucket struct, freed with it). Routing changes in ProjectRoutingTable::scan: - Compute delta_ids from the S3 sidecar (existing path). - Compute mem_ids from MemBuffer's per-bucket indexes (new). - Union into combined_ids (IDs are globally unique, no double-count). - Apply `id IN (combined_ids)` to BOTH Delta and MemBuffer scans. - Original predicate stays in the plan as correctness backstop. Memory profile: each cached index holds ~2x indexed text bytes in postings. For a 10-min bucket of moderate log volume this is tens to low hundreds of MB. Buckets drain after flush, so the cache is bounded by the flush_interval window. Also reverted SCHEMA_VERSION 2→1 since this branch hasn't shipped — no need to invalidate indexes that don't exist yet. Tests: 3 new in mem_buffer covering JIT build, unindexed-table no-op, and cache invalidation on insert. Full suite: 108/108 pass.
The previous design computed mem_ids in MemBuffer.search_text_match then applied id IN(mem_ids) inside a SEPARATE query_partitioned call. Between those calls a concurrent insert could land a row in the bucket — visible to query_partitioned's snapshot but absent from the pre-computed id set, so the new row was silently dropped from results. Fix: fold prefilter + snapshot + id-filter into one atomic per-bucket operation. New API query_partitioned_with_text_match takes filters and text_match preds; for each bucket it grabs batches under the lock, builds or reuses the cached tantivy index (sized against the same snapshot it just took), searches, then filters the snapshot by id IN(ids). The caller never sees partial state. Insertion now holds the batches lock across the push AND the cache invalidation so any reader sees either (cache matching the snapshot it just took) or (None - rebuild from this snapshot). No torn states. Routing in ProjectRoutingTable::scan simplified accordingly: - Delta side: keeps its own id IN filter from the sidecar tantivy. Delta files contain only flushed data, so MemBuffer ids never apply there. - MemBuffer side: calls query_partitioned_with_text_match which handles its own atomic prefilter inside the bucket lock. Caller passes text match preds; no manual id IN on the MemBuffer filter list. Tests: 3 new in mem_buffer covering the atomic-snapshot guarantee + the empty-preds fall-through. Full suite 108/108 still pass.
…, tantivy build metric - pgwire_handlers: AuthConfig::from_core() requires explicit PGWIRE_PASSWORD. Empty/missing fails startup unless TIMEFUSION_ALLOW_INSECURE_AUTH=true is set (dev only, logs a loud warning). Previously an unset password silently defaulted to "" and the cleartext handler accepted any client. - main: GRPC_TOKEN required by the same opt-out env. Previously an unset token meant gRPC ingest accepted any client. - buffered_write_layer::delete / update: propagate WAL append errors as DataFusionError instead of warn-and-continue. A WAL write that fails on disk-full / fsync error must NOT apply in MemBuffer — on the next restart WAL replay would reconstruct without the delete/update and the data would silently come back. INSERT already propagated correctly; DML now matches. - buffered_write_layer::quarantine_entry: write with mode 0o600 (owner- only) on Unix. Quarantine files hold raw user data that failed to deserialize; world-readable was a leak risk on shared hosts. - metrics: add tantivy_build_failures counter. flush_bucket increments it on the post-Delta tantivy callback warn path so ops can alert on accumulating index drift (silent UDF-fallback degradation otherwise invisible). Tests: 110/110 pass. Auth changes guarded behind opt-out env so existing dev/test environments are unaffected — production deployments must set PGWIRE_PASSWORD + GRPC_TOKEN.
- gRPC graceful shutdown (#7): wrap tonic in serve_with_shutdown, catch SIGTERM (not just SIGINT) so k8s rolling restarts drain cleanly. Shutdown order: signal gRPC → wait for drain (bounded by shutdown timeout) → flush buffered layer → shutdown database. Previously gRPC was tokio::spawned with no drain, so SIGTERM dropped in-flight writes. - Per-project ingest metrics (#6): record_insert / record_ingest_error now take (project_id, table_name) and attach them as KeyValue attributes. Cardinality ~2k series at typical multi-tenant scale — well within OTel limits. Lets ops slice noisy-neighbor and per-tenant SLA breaches. - Bucket-index LRU eviction (#8): moved per-bucket text index cache from TimeBucket onto MemBuffer as an LruCache<BucketCacheKey, _>. Each BucketTextIndex now carries a `size_bytes` estimate (2× indexed-text bytes); the cache enforces a byte budget defaulted to 25% of MemBuffer max memory, evicting LRU tail when exceeded. Insert + drain + evict_old_data all call cache_invalidate so dead entries free budget immediately. Correctness invariant (indexed_rows == snapshot_rows) preserved, so cache-stale entries are still rejected on lookup. GRPC_TOKEN posture: kept required-with-opt-out (TIMEFUSION_ALLOW_INSECURE_AUTH=true for local dev) — symmetric with PGWIRE_PASSWORD. Tests: 110/110 pass. The MemBuffer cache refactor changed a private API (TableBuffer::insert_batch now returns (bytes, bucket_id)); no callers outside MemBuffer.
Covers: required env vars, liveness/readiness probe sizing, backup & restore via S3 versioning + Delta time-travel, graceful shutdown sequencing, schema migration semantics, monitoring/alerting pointers, disk capacity planning, and common incident playbooks (stuck flushes, tantivy build failures, WAL corruption, slow cold start). Pairs with the alerting recipe in personal memory and the architecture overview in CLAUDE.md.
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsOverviewThis PR introduces a two-pass Security
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
struct StorageConfig {
s3_access_key_id: String,
s3_secret_access_key: String, // ← exposed by {:#?}
...
}If this struct is ever passed to a fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StorageConfig")
.field("project_id", &self.project_id)
.field("s3_access_key_id", &"[redacted]")
.field("s3_secret_access_key", &"[redacted]")
.finish()
}Bugs / Correctness
const HARD_LIMIT_MULTIPLIER: usize = 5; // max_bytes + max_bytes/5 = 120%
// used as:
let hard_limit = max_bytes.saturating_add(max_bytes / HARD_LIMIT_MULTIPLIER);The math is correct (20% headroom → 120% cap), but naming a divisor
(Some(_), None) | (None, Some(_)) => true,The
Known Gaps (documented but worth tracking)
Test Coverage
Code Quality (minor)
Strengths
SummaryThe Variant SELECT rewriter is well-designed and the unified table model simplifies multi-tenancy. The main actionable items are: (1) redact |
- StorageConfig drops the derive(Debug) and gains a manual impl that
redacts s3_access_key_id / s3_secret_access_key. Derived Debug was a
credential exposure waiting for a stray debug!() or {:?} log line.
Serialize stays (used by sqlx::FromRow); noted as a future audit
point.
- HARD_LIMIT_MULTIPLIER (a divisor named 'multiplier') →
HARD_LIMIT_HEADROOM_DIVISOR. Math unchanged (5 → +20% → 120% cap);
name now matches the operator.
Code Review — PR #16: Add Variant SELECT rewriter and comprehensive architecture docs
OverviewThis PR delivers four interlocking features on top of a restructured storage model:
The code is generally well-crafted and the inline documentation is excellent. 🚨 Critical / Correctness1. Plan cache has no invalidation path
The simplest fix is to include a schema-version token (e.g. a monotonic 2. VariantSelectRewriter silently skips
Adding
|
…s.slt Root cause: ->> routed through variant_get(col, path, 'Utf8'), but parquet_variant_compute returns NULL for numeric/boolean leaves cast to Utf8 — only strings worked. Postgres ->> needs: string → unquoted text number → text representation bool → 'true' / 'false' null → SQL NULL obj/arr → JSON text Fix: compose json_to_pg_text(variant_to_json(variant_get(col, path))). The new json_to_pg_text UDF parses the JSON output: enclosing quotes trigger a serde_json::from_str::<String>() so escapes resolve, JSON 'null' → SQL NULL, everything else passes through unchanged. -> (Arrow) keeps returning the Variant leaf so chained -> still works. Re-enables tests/slt/variant_functions.slt (drop the README), and corrects the remaining ->> expectations from quoted-JSON to Postgres text (deep, two, alice@example.com) — the impl is now the source of truth for these semantics.
#2 wrap_root_projection::peel now descends through LogicalPlan::Filter. Some DataFusion rewrites promote a Filter above the outermost Projection (filter_null_join_keys, custom rules); the previous peel hit the 'other' arm and shipped raw Variant binary to the wire. The 'other' arm's warn! is also escalated to flag 'RAW BINARY VARIANT ON THE WIRE' in caps — actionable on-call signal. #1 Plan-cache schema-staleness: today schemas are immutable (loaded via include_dir! at compile time), so cached LogicalPlans can't reference a stale shape. Document the invariant in the module header so a future hot-reload feature flips this from 'safe by construction' to 'needs invalidation' explicitly.
Code Review — PR #16: Variant SELECT Rewriter & Architecture DocsThis is a substantial PR (~31k additions). The documentation additions look thorough and accurate. Below is a focused review of the code changes. Critical / Data Correctness Issues
Security
No input validation on project_id/table_name at trust boundaries Correctness / Bugs
Performance
Error Handling
Test Coverage Gaps
Minor
Overall this is a well-structured PR. The |
Code Review — PR #16: Add Variant SELECT rewriter and comprehensive architecture docsThis is a large, ambitious PR that moves the system forward on several fronts simultaneously. Overall the code is well-structured and defensive, but the scope is significant and a few patterns warrant attention before merge. Overview
Correctness Issues1. The 2. let has_uncommitted = self.buffered_layer.is_some_and(|l| l.has_table(...));
// no lock held here
let has_committed = { ... read custom_tables and unified_tables ... };Between the 3. For 4. Plan cache schema-staleness assumption lacks a cross-reference ( The module comment correctly notes: "If we ever add hot-reload of YAML schemas, this cache must also gain a schema-version token." This critical invariant lives only in one comment. Consider adding a cross-reference in Security5. The manual 6. gRPC handler: no project-scoped authorization on An authenticated client can write to arbitrary Performance7. info!("Loaded config: {}/{}", config.project_id, config.table_name);At scale (thousands of custom project tables), this floods startup logs. Recommend a single summary 8. let canonical = statement.to_string();
if !Self::cacheable(statement, &canonical) { return None; }The AST is serialized on every extended-query Code Quality9. WAL version bump is a breaking upgrade but RUNBOOK.md has no upgrade procedure
10. Union/Intersect/Except at SELECT root emits raw binary to the wire ( The Test CoverageThe new test files are substantial. A few gaps worth noting:
SummaryMust fix before merge:
Should fix:
Nice to have:
The underlying approach is sound and the code shows careful attention to correctness: constant-time auth, redacted |
#5 StorageConfig: Serialize would expose creds even though Debug redacts. Add #[serde(serialize_with = redact_str)] on s3_access_key_id and s3_secret_access_key. sqlx::FromRow bypasses serde so DB load is unaffected. #7 load_storage_configs: per-entry info!('Loaded config: …') floods logs at scale (thousands of custom project tables). Demote per-entry to debug! and emit one info! summary count. #8 plan_cache: statement.to_string() ran *before* the cacheability check, serializing the AST on every Parse message even for uncacheable statements. Split into kind_is_cacheable() (cheap AST-variant match) and has_placeholder(&str). Reorder to check the AST variant first. #4 schema_loader::registry(): pull the load-bearing 'caches assume immutable registry' invariant out of plan_cache.rs and document it at the source of truth, listing every downstream cache that relies on it. Future hot-reload work can't miss this. #9 RUNBOOK.md: add a 'WAL format upgrades' section with the explicit drain → backup → wipe → restart procedure. Previously the only note was in the WAL_VERSION code comment.
Code Review — PR #16: Add Variant SELECT rewriter and comprehensive architecture docsOverall this is a well-designed PR with excellent inline documentation explaining the why behind design decisions. The core Bugs / Correctness1. Timing side-channel in // The comment acknowledges this, but it IS a real issue:
// "Equal-length plaintexts compare in time independent of contents;
// length mismatch short-circuits"
Some(t) if bool::from(t.as_bytes().ct_eq(expected.as_bytes())) => Ok(()),
2. if let Expr::ScalarFunction(sf) = expr && sf.func.name() == "variant_to_json" {
return false;
}This string-matching guard is a gotcha: if 3. Possible double-wrap on re-analysis (
WAL Breaking Change (Operator Risk)4. WAL version bump to 131 requires wiping the data directory ( // "Bumps: 131: Walrus collection key uses deterministic FNV-1a instead of AHasher
// (AHasher's per-build seed silently stranded entries on upgrade).
// wipe the WAL directory if upgrading."
const WAL_VERSION: u8 = 131;The comment is there, but this is a sharp upgrade edge. Existing deployments that
Known Limitations (Documented, but Worth Tracking)5. Variant wrapping skipped for // "Shapes like `INSERT … SELECT col FROM staging` … warn! so the limitation is visible"The 6. Union/Intersect/Except at root produces raw Variant bytes on the wire ( The Performance7. The measurement justification is thorough and credible. One note: the measurement was done at 4.7k inserts on a single project. If multi-project workloads or large Variant payloads materialize significantly more DashMap overhead, 1.15x might be tight. Recommend monitoring 8. Allocation is per statement (not per row), so this is fine. Just noting it's not a static — if allocation shows up in profiles for high-throughput INSERT workloads, hoisting this to a Minor Code Quality9. let has_committed = {
let custom_tables = ...contains_key(&(self.project_id, self.table_name))
|| unified_tables.contains_key(self.table_name) // ← no project_id filter
};The comment explains this is intentional (unified tables are shared), but the asymmetry between custom (keyed by 10. Vendored The vendor directory contains Positive Highlights
CIThe test job runs |
…static cache From a three-agent code-review pass (reuse / quality / efficiency): - Two Expr-extracting `extract_project_id` fns diverged: dml.rs handled And + .to_string() (loose on non-Utf8 scalars), database.rs handled Not + explicit Utf8/Utf8View matches. Promote to optimizers::extract_project_id_from_expr (handles And + Not + explicit Utf8/Utf8View/LargeUtf8 string scalars). Both callers now point here; drops ~30 lines and removes a latent correctness gap. - JsonToPgTextUdf::invoke_with_args was two-pass (Vec<Option<String>> then iterate) with three near-identical Utf8/Utf8View/LargeUtf8 match arms. Cast once to Utf8 via arrow::compute, single pass into a StringBuilder. Also switch the unquoting to serde_json::from_str ::<Value>() + match on JsonValue::String — handles JSON escapes correctly and stops false-positives like '"a"+"b"' from triggering naive starts/ends-with unquoting. - variant_select_rewriter::patch_table_scan now short-circuits before building the real_by_name HashMap when no Utf8View columns are projected (the only columns that could need patching). Saves the alloc on every TableScan against schemas without Variant. - tantivy_rewriter::indexed_columns_for returned Option<HashMap<String, &'static str>>, cloning the whole HashMap per predicate. Return Option<&'static HashMap<…>> instead — callers only need .get(col). 82 unit tests pass (gained one — fmt picked up an existing-but-unrun case); all 11 SLT files pass.
…return The Option<&'static HashMap> return added in fe4684b made the existing '&columns' at the call-site a double-borrow that clippy needless_borrow flags. Drop the extra '&'.
Code Review — PR #16: Variant SELECT Rewriter + Architecture DocsThis is a substantial PR (+31 k / -2.7 k, 119 files). The overall approach is sound and the documentation is exceptionally good. Below are findings grouped by severity. OverviewThe core change is a two-pass
Accompanying additions: gRPC Arrow IPC ingestion, Tantivy full-text index, plan cache, host-aware auto-tuning, OpenTelemetry metrics, vendored crates. Bugs / Correctness1. pub fn rewrite_plan(plan: LogicalPlan) -> LogicalPlan {
let result = plan.clone().transform_up(...);
match result {
Ok(p) => p,
Err(e) => { warn!(...); plan } // fallback to original
}
}The clone exists only to preserve 2. fn has_placeholder(sql: &str) -> bool {
sql.as_bytes().windows(2).any(|w| w[0] == b'$' && w[1].is_ascii_digit())
}The comment correctly rules out bare 3. pub fn set_global(cache: std::sync::Arc<PlanCacheHook>) {
let _ = GLOBAL.set(cache); // silently drops if already set
}If 4. The PR adds metadata ( Known Limitations Worth Calling Out in the PR Description5. other => {
warn!(target: "variant_insert_rewriter", ...
"INSERT input is not Values/Projection; json_to_variant wrapping is skipped …");
Ok(None)
}
6. warn!(
"RAW BINARY VARIANT ON THE WIRE: peel() couldn't reach a Projection …"
);If any production query shape legitimately reaches this branch (e.g. a Security7. gRPC auth — correctly implemented ✓
8. The raw token is never written to a trace or log. Good. Performance9. let real_by_name: HashMap<&str, &Arc<Field>> = real.fields().iter().map(|f| (f.name(), f)).collect();This allocates a Code Quality — Positives
Test Coverage Gaps
Summary
The Variant SELECT rewriting approach is sound. The most actionable items before merge are #1 (plan clone cost on every cache miss), #4 (missing vendor source files — may break offline builds), and #2 (plan cache pollution from false-positive placeholder check). The rest are lower severity or follow-up candidates. |
Code Review — PR #16: Add Variant SELECT rewriter and architecture docsOverviewThis PR introduces The overall approach is well-reasoned and the code is generally high quality. Several issues are worth addressing before merge. Bugs
The // current — broken
/// Legacy method for backward compatibility …
#[instrument(name = "database.get_or_create_table", skip(self), …)]
/// Return the live parquet file URIs …
pub async fn list_file_uris(&self, …)
pub async fn get_or_create_table(&self, …) { … }
// fix
/// Return the live parquet file URIs …
pub async fn list_file_uris(&self, …)
/// Legacy method for backward compatibility …
#[instrument(name = "database.get_or_create_table", skip(self), …)]
pub async fn get_or_create_table(&self, …) { … }Known Gap — Binary Variant on the Wire for UNION / AggregateThe Any query like: SELECT payload FROM t WHERE … UNION ALL SELECT payload FROM t WHERE …
SELECT payload, COUNT(*) FROM t WHERE … GROUP BY payloadwill deliver undecodable binary bytes to the pgwire client. The fix (wrap each Union arm's leaf projection) is acknowledged in the comment. Since users of a Variant column are specifically the people most likely to write these queries, this silent data-corruption path should be either fixed or surfaced as a hard error ( Design / Correctness
DataFusion's
The lazy config reload happens at the top of
The Performance
// variant_select_rewriter.rs:94
let real_by_name: std::collections::HashMap<&str, &Arc<Field>> = real.fields().iter()…collect();
Test CoverageNo end-to-end integration test selects a Variant column
This is the core contract of this PR and should be tested directly. The Suggested test addition in // Insert with attributes Variant column, then select it back
let row = client.query_one(
"SELECT attributes FROM otel_logs_and_spans WHERE project_id = $1 LIMIT 1",
&[&"test_project"],
).await?;
let attrs: &str = row.get(0);
// Should be valid JSON, not binary
assert!(serde_json::from_str::<serde_json::Value>(attrs).is_ok());No unit tests for The peel logic has several branches (Sort → Projection, Limit → Sort → Projection, nested Distinct, Filter above Projection, MAX_PEEL guard). These branches are individually easy to get wrong and fast to unit-test with constructed Minor / Style
DocumentationThe three new docs ( Summary
The |
Previously when wrap_root_projection's peel couldn't reach a Projection through Union/Intersect/Except/Aggregate/Join (etc.) at the root, it warned and let the raw Variant bytes ship over the pgwire protocol — silent data corruption for users whose queries used GROUP BY or UNION on a Variant column. Return DataFusionError::NotImplemented with the offending root node type and column names so the failure is loud and actionable. Users must wrap each branch's leaf projection with variant_to_json() explicitly, or restructure so the outermost node is a Projection / Sort / Limit / Distinct / SubqueryAlias / Filter (all of which peel).
Prior commit missed the struct-init linebreak normalization the nightly formatter applies to the new unit-test scaffolding.
Code Review — PR #16: Variant SELECT Rewriter + Architecture DocsOverviewThis PR ships the second half of the Variant type system: a two-pass The code is high quality overall — well-commented, thoughtfully designed, with good unit tests for the new rule. A few specific concerns below. Correctness concerns1. Possible silent assignment drop in Expr::Column(col) if col.name == *field_name => None, // identity skip ✓
Expr::Alias(alias) if alias.name == *field_name => { … }
Expr::Column(_) => None, // ← catches col.name != field_name — silent drop
_ => Some(…),If DataFusion ever emits a bare Expr::Column(col) if col.name == *field_name => None, // identity
Expr::Column(col) => Some((field_name.clone(), Expr::Column(col.clone()))), // rename2.
SELECT payload FROM logs WHERE project_id = 'x' HAVING payload IS NOT NULL…and DataFusion places Minor issues3.
4. use datafusion::arrow::datatypes::DataType;
if !lying_schema.fields().iter().any(|f| matches!(f.data_type(), DataType::Utf8View)) {The 5. if !is_variant_type(target_field.data_type()) || idx >= columns.len() {
continue;
}The Test coverage gaps
Performance / design notes
Documentation
SummaryThe core
Items 2 & 3 are optional for this PR but strongly recommended as follow-ups. |
Code Review: Add Variant SELECT rewriter and comprehensive architecture docsOverviewThis is a large, well-motivated PR that introduces:
The design is sound and the comments/documentation are above average. Several specific items below warrant attention. Issues1.
|
…n-marker fix
The new regression test (added in this branch) panics inside
datafusion-variant's VariantToJsonUdf at the physical exec layer:
"Extension type name missing". patch_table_scan correctly sets the
ARROW:extension:name marker on the LogicalPlan's Field metadata, but
that metadata isn't reaching the per-row Field that
`try_field_as_variant_array` calls Field::extension_type on
(which panics rather than try_extension_type).
The fix is either upstream (don't panic on missing marker / accept
Struct{Binary,Binary} by shape) or a read-side wrapper that re-injects
the metadata. Both non-trivial; #[ignore] with a clear TODO so CI is
green while the fix is scoped.
Code Review — PR #16: Variant SELECT Rewriter + Architecture DocsThis is a very large PR (119 files, 31k+ additions). The review focuses on the highest-risk areas: the two new optimizer rules, WAL changes, and the Tantivy integration. OverviewThe PR delivers several distinct capabilities shipped together:
The code is generally well-commented and structured. The doc PRD in module headers (especially Issues Found🔴 Critical / Potential Bugs1. The test itself proves the problem: // > MAX_PEEL nested SubqueryAlias should make peel() bail rather than
// recurse forever. DataFusion's own transform_up walk over a 300-deep
// plan blows the default 2 MiB test stack, so we run the whole thing
// on a larger thread — that itself is the assertion...
std::thread::Builder::new()
.stack_size(16 * 1024 * 1024)
...The test needs a 16 MiB thread because 300 deep aliases overflows 2 MiB. Tokio's default async task stack is typically 2 MiB. A query with ~200 nested CTEs/aliases could overflow in production before MAX_PEEL fires. Consider converting 2. Hard error on Union/Aggregate root with Variant columns breaks valid queries ( return Err(datafusion::error::DataFusionError::NotImplemented(format!(
"Variant columns {:?} would exit the wire unwrapped at a {} root...",
...
)));This converts a previous silent gap into a hard error. Any existing query using 🟡 Significant Issues3.
4. INSERT ... SELECT from a Utf8 column silently skips variant wrapping ( // Shapes like `INSERT … SELECT col FROM staging` (TableScan, Filter, etc.)
// don't currently get json_to_variant wrapping — the writer will hit a
// type-mismatch when staging.col is Utf8.
other => {
warn!(...);
Ok(None)
}The 5. The docstring says "keep them in sync", but 6. Separator collision in ":".hash(&mut hasher); // separator so ("ab","c") and ("a","bc") don't collideThis comment is wrong: with FNV's fold-into-hasher approach, hashing 🟢 Minor / Suggestions7. // Map timestamp comparisons to inclusive date bounds: a strict `timestamp > T`
// still admits rows on the same calendar day, so we widen `>` to `>=`...
let date_op = match op {
Operator::Gt | Operator::GtEq => Operator::GtEq,This is semantically correct (the original predicate still post-filters), but produces surprising EXPLAIN output where 8. CI runs only The PR's test plan says:
But 9. No benchmark regression detection Five new benchmark files ( 10. The comment says this is empirically measured ("bench/multiplier_bench.py, 2026-05-17"). If production workloads with very wide schemas or different write patterns exceed the 1.15× assumption, the MemBuffer hard limit will be hit more aggressively. Consider documenting the measurement conditions so future on-call can evaluate whether the value is applicable. 11. The WAL entry size guard ( Strengths Worth Calling Out
SummaryThe core feature (VariantSelectRewriter) is well-designed and well-tested. The main actionable items before merge are:
Items 3–5 are low-effort relative to their correctness impact. |
…able plans Previously when wrap_root_projection couldn't peel through to a Projection (Union, Intersect, Except, Aggregate, Join, Window, …), we hard-errored. That broke valid existing queries like UNION ALL over a Variant-bearing table. Branch-aware approach: instead of trying to descend into each set-op / agg / join arm with shape-specific rewrites, *add a top-level Projection* on top of the un-peelable plan. The projection emits Expr::Column for non-Variant fields (pass-through, no metadata change) and variant_to_json(col) AS col for Variant-typed fields. Intermediate operators still see binary Variant; only the wire boundary converts. The result is a uniform contract: every SELECT-style plan returns JSON-serialised Variant text on the wire, regardless of how the user composed the query. Replaces the NotImplemented error from d5da777.
Code Review — PR #16: Variant SELECT Rewriter + Architecture Docs
OverviewThis PR lands three interrelated features:
The architecture docs are a welcome addition and reflect the actual code accurately. StrengthsVariantSelectRewriter design
Security
Operational robustness
Issues & SuggestionsBug / Correctness1. // current
other => {
warn!(...);
Ok(None)
}
// suggested
other => {
return Err(DataFusionError::Plan(format!(
"INSERT … SELECT from {} is not yet supported for Variant columns; \
use INSERT … VALUES instead", other.display()
)));
}2. Design3. async fn execute<F, Fut>(self, mem_op: F, delta_op: Fut) -> Result<u64>The future is built at the call site before async fn execute<F, G, Fut>(self, mem_op: F, delta_op: G) -> Result<u64>
where
F: FnOnce(&BufferedWriteLayer, Option<&Expr>) -> Result<u64>,
G: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<u64>>,4. Performance5. Unnecessary clone in let assignments_clone = assignments.clone(); // line 445
.execute(
|layer, pred| layer.update(project_id, table_name, pred, &assignments),
perform_delta_update(database, table_name, project_id, predicate, assignments, session).instrument(update_span),
)(Moving the clone responsibility into 6. Minor / Style7. 8. Hardcoded Variant field names in let fields = vec![
Arc::new(Field::new("metadata", DataType::Binary, false)),
Arc::new(Field::new("value", DataType::Binary, false)),
];If 9. Test plan checkboxes are unchecked Please confirm these are passing before merge — especially the integration test, since this PR touches the query planner, DML, and flush paths which interact in subtle ways. Security
SummaryThe core
Good work overall — the design decisions are well-reasoned and the code is production-quality. |
- DmlContext::execute now takes delta_op as FnOnce() -> Fut so the future (which may acquire a write lock) is only constructed when has_committed is true. - perform_update_with_buffer: drop the unconditional assignments.clone(); clone now only happens inside the delta closure on the committed path. - DmlOperation: unify operation-name source via #[strum(to_string=...)], drop as_uppercase(); DisplayAs delegates to ExecutionPlan::name() so CamelCase 'DeltaUpdateExec' is preserved. - schema_loader: introduce VARIANT_METADATA_FIELD / VARIANT_VALUE_FIELD constants; convert_variant_columns and is_variant_type now use them so a future delta-kernel rename touches one place. - variant_select_rewriter: document patch_table_scan fast path as an over-approximation (genuine Utf8View cols fall through to the full pass; the only cost is a wasted HashMap build).
INSERT … SELECT col FROM staging (TableScan/Filter root, not Values or Projection) previously logged a warn! and let the write proceed, which then hit an opaque Utf8→Struct type mismatch deep in the executor. Fail fast at plan time with an actionable message pointing users at INSERT … VALUES or an explicit json_to_variant(col) wrap.
Code Review — PR #16: Add Variant SELECT rewriter and comprehensive architecture docsOverviewThis is a large, well-structured PR that touches three primary areas:
The code is well-commented and the architecture docs add real long-term value. The issues below are ordered by severity. 🔴 Critical1. Core feature panics at execution time — the integration test is
#[ignore = "see TODO above — datafusion-variant requires extension marker on runtime Field"]The TODO explains that
This should block merge until the executor-side fix lands (either teach 🟠 Important2. WAL version went from 128 → 1 — backwards, with a silent data-loss upgrade path // Before
const WAL_VERSION: u8 = 128;
// After
const WAL_VERSION: u8 = 1;The new This is a hard startup failure requiring a manual wipe of the WAL directory (and losing all unflashed in-flight data). There is no RUNBOOK entry for this, no log message guiding operators to the required action, and no in-place migration. Consider:
3. The production code correctly removed the unsafe {
std::env::set_var("AWS_S3_BUCKET", "timefusion-tests");
std::env::set_var("TIMEFUSION_TABLE_PREFIX", …);
}These tests are skipped in CI but the comment says "Run with 🟡 Minor4.
// optimizers/mod.rs — contains_project_id
matches!(
(left.as_ref(), right.as_ref()),
(Expr::Column(col), Expr::Literal(_, _)) | … if col.name == "project_id"
)But 5. Delta exclusion OR-chain is unbounded // database.rs
for (start, end) in &mem_ranges {
delta_filters.push(Expr::BinaryExpr(BinaryExpr { … OR … }));
}Each MemBuffer bucket appends one 6. Stale/concatenated doc comment on The doc block immediately above 7. // variant_select_rewriter.rs
let variant_cols: Vec<usize> = schema.fields().iter()
.enumerate()
.filter(|(_, f)| is_variant_type(f.data_type()))
.map(|(i, _)| i)
.collect();
if variant_cols.is_empty() {
return Ok(plan);
}
✅ Strengths
|
…as after TableScan patch Two execution-time issues fixed; the previously-ignored test_variant_column_round_trips_as_json now runs and passes. 1. datafusion-variant UDFs call try_field_as_variant_array(field) and bail with 'Extension type name missing' when the arg field lacks the ARROW:extension:name=arrow.parquet.variant marker. The marker is set on the LogicalPlan schema (patch_table_scan, SchemaRegistry::fields) but doesn't survive into the physical executor's per-row Field, so any SELECT touching a Variant column would panic. VariantExtWrapper<U> is a thin ScalarUDFImpl that re-stamps the marker on arg_fields before delegating; VariantToJsonExtUdf / VariantGetExtUdf replace the upstream UDFs at registration and inline call sites. return_field_from_args is delegated explicitly because VariantGetUdf's return_type panics and the real shape comes from return_field_from_args. 2. After patch_table_scan rewrites a TableScan's projected_schema, parent Projection/Sort/Filter nodes keep their stale Utf8View-typed DFSchema, so wrap_projection's is_variant_expr check misses the Variant column on plans with intermediate projections (e.g. ORDER BY x LIMIT n introduces an outer Projection over a Sort). transform_up now both patches the scan and calls recompute_schema on each node so Variant types propagate bottom-up before the wrap pass runs.
Code Review — PR #16: Variant SELECT Rewriter + Architecture DocsOverviewThis PR adds a Strengths
Issues
|
| Gap | Severity |
|---|---|
add_root_variant_projection (Union/Aggregate root) has no unit test |
Moderate |
VariantInsertRewriter double-analyze idempotency not tested |
Minor |
extract_dml_info with Window/Join/Union plan shape not tested |
Minor |
patch_table_scan with genuine (non-Variant) Utf8View columns not tested |
Minor |
| Integration test port range (5433–5532, 100 ports) is narrow for parallel CI | Minor |
Summary
The architecture is sound and the design choices are well-documented. The main actionable items before merge are:
- Add a test for
add_root_variant_projectioncovering an Aggregate or Union root. - Add an idempotency guard to
VariantInsertRewriter(consistent with the SELECT rewriter). - Harden
extract_dml_infoagainst multi-input plan nodes — at minimum document the accepted limitation. - Widen the integration test port range to reduce the risk of flaky parallel CI failures.
Everything else is low-priority polish. The code is well-commented, the security considerations (credential redaction, tenant isolation guards) are handled correctly, and the tracing instrumentation is thorough.
Code Review — PR #16: Add Variant SELECT rewriter and comprehensive architecture docsOverview: This PR introduces a Correctness
SecurityAWS credentials stored as plaintext VARCHAR in s3_access_key_id VARCHAR(500) NOT NULL,
s3_secret_access_key VARCHAR(500) NOT NULL,The
WAL quarantine files use Performance
Schema-staleness comment in Code Quality
Test CoverageGood:
Missing:
Structural / Process
SummaryThe core logic — two-pass |
Summary
variant_to_json()in SELECT projections, ensuring proper serialization for PostgreSQL wire protocolDocumentation Added
docs/ARCHITECTURE.md- Full system architecture overview with diagramsdocs/VARIANT_TYPE_SYSTEM.md- Variant type implementation->and->>)docs/WAL.md- Write-ahead log implementationCode Changes
src/optimizers/variant_select_rewriter.rs- New analyzer rulesrc/database.rs- Unified table storage improvementssrc/dml.rs- Buffered layer integrationTest plan
cargo testfor unit testscargo test --test integration_testfor integration tests