From d95cdb0cfaff2582e20b872dac500ab862c295a9 Mon Sep 17 00:00:00 2001 From: Leo Meyerovich Date: Wed, 6 May 2026 17:33:45 -0700 Subject: [PATCH 1/3] cypher: retire post-#1260 reentry delegator shims --- CHANGELOG.md | 3 ++ graphistry/compute/gfql/cypher/lowering.py | 54 +++---------------- .../cypher/test_lowering_s3_split_guard.py | 31 ----------- 3 files changed, 9 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3c620f5eb..7a9987061e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Tests - **GFQL / Cypher two-MATCH reentry varlen regression hardening (#1001)**: Strengthened reentry varlen acceptance assertions from shape-only checks to exact expected rows, and added forward/reverse split-vs-connected query equivalence regressions to guard against wrong-row drift in the `match5-25/26` query family. +### Internal +- **GFQL / Cypher row-carrier follow-through cleanup (#989, post-#1260 split)**: Retired transitional lowering-level bounded-reentry delegator shims (`_map_terminal_reentry_query`, `_drop_bare_alias_items_from_stage`, `_rewrite_multi_whole_row_prefix`, `_compile_bounded_reentry_query`) that only forwarded into `graphistry/compute/gfql/cypher/reentry/runtime.py`. Lowering now calls runtime-owned reentry helpers directly at use sites, and the split-guard tests were trimmed to keep only projection-planning delegator assertions. + ## [0.55.1 - 2026-05-05] ### Tests diff --git a/graphistry/compute/gfql/cypher/lowering.py b/graphistry/compute/gfql/cypher/lowering.py index a9e83b3bd9..422a308db5 100644 --- a/graphistry/compute/gfql/cypher/lowering.py +++ b/graphistry/compute/gfql/cypher/lowering.py @@ -7243,8 +7243,10 @@ def rewrite_text(expr: ExpressionText, field: str) -> ExpressionText: # `_collect_secondary_property_refs` would fail-fast on what is in fact a # forwarding pattern, blocking IC3 even after #1248 admits the prefix WITH. secondary_forwarding_re = re.compile(r"[A-Za-z_][A-Za-z0-9_]*") + from graphistry.compute.gfql.cypher.reentry import runtime as _reentry_runtime + cleaned_with_stages_tail = tuple( - _drop_bare_alias_items_from_stage( + _reentry_runtime._drop_bare_alias_items_from_stage( stage, secondary_aliases, identifier_re=secondary_forwarding_re ) for stage in query.with_stages[1:] @@ -7373,52 +7375,6 @@ def rewrite_text(expr: ExpressionText, field: str) -> ExpressionText: return rewritten_query, rewritten_prefix_stage, tuple(sorted(secondary_aliases)) -def _map_terminal_reentry_query( - compiled_query: CompiledCypherQuery, - *, - transform: Callable[[CompiledCypherQuery], CompiledCypherQuery], -) -> CompiledCypherQuery: - from graphistry.compute.gfql.cypher.reentry import runtime as _reentry_runtime - - return _reentry_runtime._map_terminal_reentry_query(compiled_query, transform=transform) - - -def _drop_bare_alias_items_from_stage( - stage: ProjectionStage, - aliases: AbstractSet[str], - *, - identifier_re: "re.Pattern[str]", -) -> ProjectionStage: - from graphistry.compute.gfql.cypher.reentry import runtime as _reentry_runtime - - return _reentry_runtime._drop_bare_alias_items_from_stage(stage, aliases, identifier_re=identifier_re) - - -def _rewrite_multi_whole_row_prefix( - prefix_stage: ProjectionStage, - *, - query: CypherQuery, - reentry_first_alias: Optional[str], -) -> Tuple[ProjectionStage, Tuple[ProjectionStage, ...], Dict[str, Tuple[str, ...]]]: - from graphistry.compute.gfql.cypher.reentry import runtime as _reentry_runtime - - return _reentry_runtime._rewrite_multi_whole_row_prefix( - prefix_stage, - query=query, - reentry_first_alias=reentry_first_alias, - ) - - -def _compile_bounded_reentry_query( - query: CypherQuery, - *, - params: Optional[Mapping[str, Any]] = None, -) -> CompiledCypherQuery: - from graphistry.compute.gfql.cypher.reentry import runtime as _reentry_runtime - - return _reentry_runtime._compile_bounded_reentry_query(query, params=params) - - def _compile_call_query( query: CypherQuery, *, @@ -8349,7 +8305,9 @@ def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery: params=params, ) if query.reentry_matches: - return _attach_graph_context(_compile_bounded_reentry_query(query, params=params)) + from graphistry.compute.gfql.cypher.reentry import runtime as _reentry_runtime + + return _attach_graph_context(_reentry_runtime._compile_bounded_reentry_query(query, params=params)) if query.call is not None: return _attach_graph_context(_compile_call_query(query, params=params)) if query.row_sequence: diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering_s3_split_guard.py b/graphistry/tests/compute/gfql/cypher/test_lowering_s3_split_guard.py index 0828915cd2..f60d832b2e 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering_s3_split_guard.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering_s3_split_guard.py @@ -1,8 +1,6 @@ from __future__ import annotations -from graphistry.compute.chain import Chain from graphistry.compute.gfql.cypher import lowering, projection_planning -from graphistry.compute.gfql.cypher.reentry import runtime as reentry_runtime def test_issue_1301_projection_split_delegator_round_trip() -> None: @@ -11,17 +9,6 @@ def test_issue_1301_projection_split_delegator_round_trip() -> None: extracted = projection_planning._split_qualified_name(expr, line=1, column=1) assert lowered == extracted == ("person", "name") - -def test_issue_1301_reentry_runtime_delegator_identity_path() -> None: - compiled = lowering.CompiledCypherQuery(chain=Chain([], validate=False)) - - lowered = lowering._map_terminal_reentry_query(compiled, transform=lambda q: q) - extracted = reentry_runtime._map_terminal_reentry_query(compiled, transform=lambda q: q) - - assert lowered is compiled - assert extracted is compiled - - def test_issue_1301_projection_delegator_forwards_args(monkeypatch) -> None: captured = {} alias_obj = object() @@ -43,21 +30,3 @@ def _stub(expr, *, alias_targets, params, field, line, column): assert out == ("x", "y") assert captured["args"] == ("a.b", alias_targets, {"p": 1}, "return.item", 7, 11) - - -def test_issue_1301_reentry_compile_delegator_forwards_params(monkeypatch) -> None: - captured = {} - - def _stub(query, *, params): - captured["query"] = query - captured["params"] = params - return "compiled" - - monkeypatch.setattr(reentry_runtime, "_compile_bounded_reentry_query", _stub) - query = object() - params = {"limit": 5} - out = lowering._compile_bounded_reentry_query(query, params=params) - - assert out == "compiled" - assert captured["query"] is query - assert captured["params"] == params From 3804628ab486b2e0172878fbd8bfc6217a0e031c Mon Sep 17 00:00:00 2001 From: Leo Meyerovich Date: Wed, 6 May 2026 17:40:22 -0700 Subject: [PATCH 2/3] ci: retrigger clean pr workflow From 2e17c6e9c69978c555bd4f0160714c5519b4c165 Mon Sep 17 00:00:00 2001 From: Leo Meyerovich Date: Wed, 6 May 2026 22:47:49 -0700 Subject: [PATCH 3/3] cypher: retire post-#1260 reentry delegator shims --- CHANGELOG.md | 7 + docs/source/gfql/cypher.rst | 52 +- docs/source/gfql/spec/cypher_mapping.md | 20 +- graphistry/compute/gfql/cypher/lowering.py | 29 -- .../compute/gfql/cypher/reentry/__init__.py | 2 + .../compute/gfql/cypher/reentry/execution.py | 472 +++++++++++++++++ .../compute/gfql/cypher/result_postprocess.py | 36 +- graphistry/compute/gfql_unified.py | 475 +----------------- .../compute/gfql/cypher/test_lowering.py | 161 +++++- 9 files changed, 715 insertions(+), 539 deletions(-) create mode 100644 graphistry/compute/gfql/cypher/reentry/execution.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a9987061e..28cc883214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Development] +### Internal +- **GFQL / Cypher bounded-reentry runtime extraction (#987 Step 3)**: Moved bounded-reentry data-frame execution helpers (`_compiled_query_reentry_state`, `_compiled_query_scalar_reentry_state`, `_compiled_query_freeform_reentry_state`, `_freeform_broadcast_row_to_nodes`, `_union_scalar_reentry_results`, `_apply_optional_reentry_null_fill`, `_aligned_reentry_rows`, `_reentry_carry_payload`, `_ordered_reentry_start_nodes`, `_reentry_validation_error`, the two suggestion constants) out of `graphistry/compute/gfql_unified.py` into a new `graphistry/compute/gfql/cypher/reentry/execution.py` module so the bounded-reentry contract assembled at compile time (`ReentryPlan`) and the matching data-frame stitching live next to each other. `_entity_projection_meta_entry` moved to `graphistry/compute/gfql/cypher/result_postprocess.py` next to `WholeRowProjectionMeta` since it is shared between the connected-OPTIONAL-MATCH and bounded-reentry paths. Pure-move refactor — no semantic change; `gfql_unified.py` shrinks by ~440 LOC and now re-exports the moved private names via aliased imports so existing tests reaching into `graphistry.compute.gfql_unified._compiled_query_reentry_state` continue to work. + ### Documentation - **GFQL component-labeling examples + README clarity (#1324)**: Added concise WCC/SCC labeling examples for `compute_cugraph`, `compute_igraph('clusters')`, and local Cypher `CALL graphistry.cugraph.*` write/row modes in GFQL docs, clarified that component IDs are partition labels (not stable semantic IDs), and tightened the main README GFQL intro sentence for readability. +- **GFQL / Cypher docs — variable-length boundary refresh (#973)**: Updated direct-Cypher capability docs (`docs/source/gfql/cypher.rst`, `docs/source/gfql/spec/cypher_mapping.md`) to reflect current support for connected variable-length patterns and bounded/exact variable-length `WHERE` pattern predicates, while preserving explicit fail-fast notes for remaining path/list-carrier and advanced row-shaping gaps. + +### Changed +- **GFQL / Cypher lowering — bounded/exact variable-length `WHERE` pattern predicates (#973)**: Removed the pre-normalization compiler gate that rejected bounded/exact variable-length `WHERE` pattern predicates and now lower these shapes through the existing WHERE-pattern rewrite and row-filter paths. Converted the old fail-fast test into positive execution coverage and added boolean-wrapper amplification (`OR`/`XOR`/`NOT`) for bounded variable-length `WHERE` predicates in `graphistry/tests/compute/gfql/cypher/test_lowering.py`. ### Tests - **GFQL / Cypher two-MATCH reentry varlen regression hardening (#1001)**: Strengthened reentry varlen acceptance assertions from shape-only checks to exact expected rows, and added forward/reverse split-vs-connected query equivalence regressions to guard against wrong-row drift in the `match5-25/26` query family. diff --git a/docs/source/gfql/cypher.rst b/docs/source/gfql/cypher.rst index 31652697e9..4e4dad0188 100644 --- a/docs/source/gfql/cypher.rst +++ b/docs/source/gfql/cypher.rst @@ -206,11 +206,11 @@ Support Matrix - Execute directly through ``g.gfql("...")``. Helper translation to a single ``Chain`` is stricter. * - Variable-length relationship patterns - Partial - - Direct Cypher supports endpoint-only traversals such as ``[*2]``, - ``[*1..3]``, ``[*]``, and typed forms like ``[:R*2..4]``, plus bounded - connected multi-relationship patterns where the row shape stays in the - current supported subset. Path/list-carrier uses, bounded/exact - ``WHERE`` pattern predicates, and broader branching/path-shaping cases + - Direct Cypher supports endpoint traversals such as ``[*2]``, + ``[*1..3]``, ``[*]``, and typed forms like ``[:R*2..4]``; connected + multi-relationship variable-length patterns; and bounded/exact/fixed-point + variable-length ``WHERE`` pattern predicates in the current row-shaped + subset. Path/list-carrier uses and unsupported path/row-shaping cases still fail fast. * - ``CREATE`` / ``DELETE`` / ``SET`` - Not supported @@ -236,9 +236,10 @@ Pattern Matching Forms - Node labels and multi-label node patterns such as ``(p:Person:Admin)``. - Relationship direction forms ``->``, ``<-``, and undirected ``-[]-``. - Relationship type alternation such as ``[r:KNOWS|HATES]``. -- Single variable-length relationship patterns when they are the only - relationship in the connected pattern, including ``[*n]``, ``[*m..n]``, - ``[*]``, and typed forms such as ``[:R*2..4]``. +- Single variable-length relationship patterns, including ``[*n]``, + ``[*m..n]``, ``[*]``, and typed forms such as ``[:R*2..4]``. +- Connected patterns that mix variable-length and fixed-length relationships, + such as ``MATCH (a)-[:R*2]->()-[:S]->(c) RETURN c``. - Connected comma-separated patterns such as ``MATCH (a)-[:A]->(b), (b)-[:B]->(c)``. - Repeated ``MATCH`` clauses when they stay connected through shared aliases. @@ -255,19 +256,18 @@ WHERE Forms - Label predicates such as ``WHERE b:Foo:Bar``. - Relationship-type predicates such as ``WHERE type(r) = 'KNOWS'``. - Positive relationship-existence pattern predicates such as - ``WHERE (n)-[:R]->()`` and bare fixed-point variable-length existence checks - such as ``WHERE (n)-[*]-()``. -- One positive relationship-existence pattern predicate may be combined with - ordinary row filters through top-level ``AND``, for example - ``WHERE n.kind = 'x' AND (n)-[:R*]->() AND n.id <> 'a'``. + ``WHERE (n)-[:R]->()`` and variable-length existence checks such as + ``WHERE (n)-[*]-()`` and ``WHERE (n)-[:R*2]->()``. +- Pattern predicates can be combined with row predicates in the current + boolean subset, including ``AND`` / ``OR`` / ``XOR`` and ``NOT`` forms. Variable-Length Relationship Boundary ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Direct Cypher multihop support is intentionally narrow in the current landing -slice. The supported direct forms include endpoint traversals and bounded -connected multi-relationship patterns where the result stays in the current -row-shaping subset, for example: +Direct Cypher multihop support remains intentionally bounded. The supported +direct forms include endpoint traversals, connected multi-relationship +patterns, and variable-length ``WHERE`` pattern predicates where the result +stays in the current row-shaping subset, for example: - ``MATCH (a)-[*2]->(b) RETURN b`` - ``MATCH (a)-[:R*1..3]->(b) RETURN b`` @@ -275,20 +275,16 @@ row-shaping subset, for example: - ``MATCH (a)-[:R*1..2]-(b) RETURN b`` - ``MATCH (a)-[:R*2]->(b)-[:S]->(c) RETURN c`` - ``MATCH (a)-[:R]->(b), (b)-[:S*1..2]->(c) RETURN a.id AS a_id, c.id AS c_id`` +- ``MATCH (n) WHERE (n)-[:R*2]->() RETURN n`` +- ``MATCH (n) WHERE NOT (n)-[:R*2]->() RETURN n.id AS id`` The current compiler explicitly rejects these remaining subfamilies with ``GFQLValidationError`` instead of attempting unsound execution: - path/list-carrier use of a variable-length relationship alias, such as ``RETURN r`` or ``count(r)`` -- exact or bounded variable-length ``WHERE`` pattern predicates such as - ``WHERE (n)-[:R*2]-()`` -- top-level ``OR`` / ``NOT`` around variable-length ``WHERE`` pattern - predicates, or more than one positive pattern predicate in the same - ``WHERE`` clause -- branching connected multihop patterns, or shapes that would require - unsupported path/relationship-carrier row shaping around a variable-length - segment +- shapes that still require unsupported path/relationship-carrier row shaping + around a variable-length segment - connected multi-pattern relationship-alias projection such as ``RETURN r`` / ``r.prop`` when it would require unsupported row shaping - multi-alias ``RETURN *`` projections that would require unsupported @@ -431,10 +427,8 @@ Not Supported Today - Variable-length relationship aliases used as path/list carriers, such as ``RETURN r`` or ``count(r)``. -- Exact or bounded variable-length ``WHERE`` pattern predicates such as - ``WHERE (n)-[:R*2]-()``. -- Branching connected multihop patterns, or connected multihop shapes that - still require unsupported path/relationship-carrier row shaping. +- Connected multihop shapes that still require unsupported + path/relationship-carrier row shaping. - Multiple disconnected ``MATCH`` patterns used as arbitrary joins. - Multi-pattern re-entry shapes beyond the bounded single ``MATCH ... WITH ... MATCH ... RETURN`` form. diff --git a/docs/source/gfql/spec/cypher_mapping.md b/docs/source/gfql/spec/cypher_mapping.md index a920bfe116..754611ab5c 100644 --- a/docs/source/gfql/spec/cypher_mapping.md +++ b/docs/source/gfql/spec/cypher_mapping.md @@ -36,11 +36,11 @@ When translating from Cypher, you'll encounter three scenarios: ### Direct Translations - Graph patterns: `(a)-[r]->(b)` → chain operations - Property filters: WHERE clauses embed into operations -- Path traversals: direct `g.gfql("MATCH ...")` supports endpoint-only single - variable-length relationship forms such as `[*2]`, `[*1..3]`, and `[*]`. - Native GFQL still gives you the full explicit hop surface, including output - slicing, intermediate-hop aliasing, and rewrites for currently unsupported - direct-Cypher multihop shapes. +- Path traversals: direct `g.gfql("MATCH ...")` supports single and connected + variable-length relationship forms such as `[*2]`, `[*1..3]`, and `[*]`, + including bounded/exact variable-length `WHERE` pattern predicates in the + current row-shaped subset. Native GFQL still gives you the full explicit hop + surface (output slicing, intermediate-hop aliasing, and custom rewrites). - Pattern composition: Multiple patterns become sequential operations - Same-path constraints: `WHERE` across steps → `g.gfql([...], where=[...])` @@ -255,10 +255,10 @@ g.gfql([ ### Edge Patterns Rows using `[*...]` below show the native GFQL rewrite for the same traversal -intent. Direct `g.gfql("MATCH ...")` now accepts these endpoint-only -single-variable-length relationship forms, while native GFQL remains the more -explicit option when you need intermediate-hop control or unsupported mixed -pattern shapes. +intent. Direct `g.gfql("MATCH ...")` accepts these variable-length forms in +the supported direct-Cypher subset, while native GFQL remains the more explicit +option when you need intermediate-hop control or advanced path/list-carrier +semantics. | Cypher / intent | Python | Wire Protocol (compact) | |-----------------|--------|-------------------------| @@ -274,7 +274,7 @@ pattern shapes. | `-[r:BOUGHT {amount: gt(100)}]->` | `e_forward({"type": "BOUGHT", "amount": gt(100)}, name="r")` | `{"type": "Edge", "direction": "forward", "edge_match": {"type": "BOUGHT", "amount": {"type": "GT", "val": 100}}, "name": "r"}` | When you need constraints on intermediate hops, path/list-carrier semantics, or -mixed connected patterns beyond the current direct-Cypher subset, use repeated +advanced row-shaping beyond the current direct-Cypher subset, use repeated single-hop GFQL steps with aliases instead of collapsing the traversal into one multihop edge operator. diff --git a/graphistry/compute/gfql/cypher/lowering.py b/graphistry/compute/gfql/cypher/lowering.py index 422a308db5..5031749f9c 100644 --- a/graphistry/compute/gfql/cypher/lowering.py +++ b/graphistry/compute/gfql/cypher/lowering.py @@ -5590,34 +5590,6 @@ def _is_variable_length_relationship_pattern(relationship: RelationshipPattern) ) -def _reject_unsupported_variable_length_where_pattern_predicates(query: CypherQuery) -> None: - if query.where is None: - return - predicates: List[WherePatternPredicate] = [ - predicate for predicate in query.where.predicates if isinstance(predicate, WherePatternPredicate) - ] - if query.where.expr_tree is not None: - predicates.extend(_where_expr_tree_pattern_predicates(query.where.expr_tree)) - for predicate in predicates: - relationships = [ - element - for element in predicate.pattern - if isinstance(element, RelationshipPattern) - ] - for relationship in relationships: - if not _is_variable_length_relationship_pattern(relationship): - continue - if relationship.min_hops is None and relationship.max_hops is None and relationship.to_fixed_point: - continue - raise _unsupported( - "Cypher WHERE pattern predicates currently support only bare variable-length fixed-point relationships, not exact or bounded hop counts", - field="where", - value=boolean_expr_to_text(query.where.expr_tree) if query.where.expr_tree is not None else None, - line=predicate.span.line, - column=predicate.span.column, - ) - - def _reject_nonterminal_variable_length_relationship_patterns(query: CypherQuery) -> None: # noqa: ARG001 """No-op: variable-length rels in connected patterns are now supported. @@ -8287,7 +8259,6 @@ def _attach_graph_context(result: CompiledCypherQuery) -> CompiledCypherQuery: normalizer = ASTNormalizer() query = normalizer.rewrite_shortest_path(query) - _reject_unsupported_variable_length_where_pattern_predicates(query) _reject_variable_length_path_alias_references(query, params=params) query = normalizer.rewrite_where_pattern_predicates(query) diff --git a/graphistry/compute/gfql/cypher/reentry/__init__.py b/graphistry/compute/gfql/cypher/reentry/__init__.py index df9ce0c2cf..6b4fcc8931 100644 --- a/graphistry/compute/gfql/cypher/reentry/__init__.py +++ b/graphistry/compute/gfql/cypher/reentry/__init__.py @@ -6,6 +6,8 @@ - prefix carry-column / order helpers (``carry``) - AST/query rewriters that retarget reentry expressions onto carried columns (``rewrite``) +- compile-time bounded-reentry query rewrites (``runtime``) +- data-frame execution stitching for bounded reentry (``execution``; #987 Step 3) Public symbols are re-exported from ``cypher.lowering`` so existing imports (``from graphistry.compute.gfql.cypher.lowering import _reentry_hidden_column_name``) diff --git a/graphistry/compute/gfql/cypher/reentry/execution.py b/graphistry/compute/gfql/cypher/reentry/execution.py new file mode 100644 index 0000000000..6e38ba56f9 --- /dev/null +++ b/graphistry/compute/gfql/cypher/reentry/execution.py @@ -0,0 +1,472 @@ +"""Bounded-reentry data-frame execution helpers. + +Extracted from ``graphistry/compute/gfql_unified.py`` under #987 Step 3 +(`move runtime stitching into a dedicated reentry module`). Pure-move +refactor — no semantic changes vs the prior in-line definitions. + +Together with ``cypher/reentry_plan.py`` (compile-time contract), +``cypher/reentry/runtime.py`` (compile-time query rewrites), and the +``CompiledCypherQuery.start_nodes_query`` chain, this module owns the +*data-frame side* of bounded ``MATCH ... WITH ... MATCH ...`` re-entry: +preparing the dispatch base graph, building the seeded ``start_nodes`` +table, fanning out multi-row scalar / free-form prefixes, and null-filling +unmatched rows for the OPTIONAL re-entry case. + +Callers (currently only ``gfql_unified.py``): + +- ``_compiled_query_reentry_state`` — whole-row carry +- ``_compiled_query_scalar_reentry_state`` — scalar-only prefix carry +- ``_compiled_query_freeform_reentry_state`` — free-form intermediate MATCH (single-row) +- ``_freeform_broadcast_row_to_nodes`` — single-row broadcast (also used per-row in #1285 multi-row) +- ``_union_scalar_reentry_results`` — concat per-row dispatches +- ``_apply_optional_reentry_null_fill`` — OPTIONAL re-entry null fill +""" +# ruff: noqa: E501 +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, cast + +from graphistry.Engine import EngineAbstract, df_concat, df_cons, resolve_engine, safe_merge +from graphistry.Plottable import Plottable +from graphistry.compute.exceptions import GFQLValidationError, ErrorCode +from graphistry.compute.gfql.cypher.lowering import ( + CompiledCypherQuery, + _reentry_hidden_column_name, +) +from graphistry.compute.gfql.cypher.reentry_plan import ReentryPlan +from graphistry.compute.gfql.cypher.result_postprocess import ( + entity_projection_meta_entry, +) +from graphistry.compute.typing import DataFrameT, SeriesT + + +REENTRY_WHOLE_ROW_SUGGESTION = "Carry a whole-row node alias through WITH before MATCH re-entry." +REENTRY_SCALAR_SUGGESTION = "Carry scalar columns through WITH before MATCH re-entry." + + +def reentry_validation_error( + message: str, + *, + value: Any, + suggestion: str, + field: str = "with", +) -> GFQLValidationError: + return GFQLValidationError( + ErrorCode.E108, + message, + field=field, + value=value, + suggestion=suggestion, + language="cypher", + ) + + +def apply_optional_reentry_null_fill( + result: Plottable, + *, + prefix_result: Plottable, + engine: Union[EngineAbstract, str], + empty_result_row: Optional[Dict[str, Any]] = None, +) -> Plottable: + """Null-fill result rows for prefix rows that the optional reentry didn't match.""" + prefix_df = getattr(prefix_result, "_nodes", None) + result_df = getattr(result, "_nodes", None) + + if prefix_df is None or len(prefix_df) == 0: + return result + + prefix_rows = len(prefix_df) + result_rows = 0 if result_df is None else len(result_df) + + if result_rows >= prefix_rows: + return result + + concrete_engine = resolve_engine(cast(Any, engine), result) + df_ctor = df_cons(concrete_engine) + concat = df_concat(concrete_engine) + + # Use the compiled empty_result_row template (correct projected column names) + # or fall back to the result's own columns. + if empty_result_row is not None: + null_row = dict(empty_result_row) + elif result_df is not None and len(result_df.columns) > 0: + null_row = {col: None for col in result_df.columns} + else: + null_row = {} + + if result_df is None or len(result_df) == 0: + if null_row: + out = result.bind() + out._nodes = df_ctor([dict(null_row) for _ in range(prefix_rows)]) + return out + return result + + missing_count = prefix_rows - result_rows + fill_rows = [dict(null_row) for _ in range(missing_count)] + + fill_df = df_ctor(fill_rows) + out = result.bind() + out._nodes = concat([result_df, fill_df], ignore_index=True, sort=False) + edges_df = getattr(result, "_edges", None) + if edges_df is not None: + out._edges = edges_df[:0] + return out + + +def compiled_query_reentry_state( + base_graph: Plottable, + plan: ReentryPlan, + prefix_result: Plottable, + *, + engine: Union[EngineAbstract, str], +) -> Tuple[Plottable, DataFrameT]: + if plan.scalar_only or plan.free_form: + raise reentry_validation_error( + "Cypher whole-row reentry dispatcher received a non-whole-row ReentryPlan", + value=plan.reentry_alias_name, + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + output_name = plan.reentry_alias_name + carried_columns = tuple(plan.scalar_columns) + meta = entity_projection_meta_entry( + prefix_result, + output_name=output_name, + field="with", + message="Cypher MATCH after WITH could not recover carried node identities from the prefix stage", + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + if meta["table"] != "nodes": + raise reentry_validation_error( + "Cypher MATCH after WITH currently supports node re-entry only", + value=output_name, + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + ids = meta["ids"] + id_column = meta["id_column"] + if not hasattr(ids, "dropna"): + raise reentry_validation_error( + "Cypher MATCH after WITH could not recover carried node identities from the prefix stage", + value=output_name, + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + base_nodes = getattr(base_graph, "_nodes", None) + if base_nodes is None or id_column not in base_nodes.columns: + raise reentry_validation_error( + "Cypher MATCH after WITH could not recover the base node table for re-entry", + value=id_column, + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + concrete_engine = resolve_engine(cast(Any, engine), base_graph) + carried_ids, aligned_prefix_rows = aligned_reentry_rows( + ids=cast(SeriesT, ids), + prefix_rows=getattr(prefix_result, "_nodes", None), + output_name=output_name, + ) + carried_node_ids = cast(DataFrameT, df_cons(concrete_engine)({id_column: carried_ids})) + if not carried_columns: + return base_graph, ordered_reentry_start_nodes( + node_rows=base_nodes, + carried_node_ids=carried_node_ids, + id_column=id_column, + ) + if aligned_prefix_rows is None: + raise reentry_validation_error( + "Cypher MATCH after WITH could not recover carried row columns from the prefix stage", + value=output_name, + suggestion=REENTRY_SCALAR_SUGGESTION, + ) + duplicate_mask = carried_ids.duplicated() + if bool(duplicate_mask.any()) if hasattr(duplicate_mask, "any") else False: + raise reentry_validation_error( + "Cypher MATCH after WITH carried scalar columns currently require unique carried node rows", + value=output_name, + suggestion="Use a single-node seed WITH shape, or avoid carrying scalar columns into MATCH re-entry.", + ) + + carry_payload = reentry_carry_payload( + carried_node_ids=carried_node_ids, + prefix_rows=aligned_prefix_rows, + carried_columns=carried_columns, + ) + hidden_columns = [name for name in map(_reentry_hidden_column_name, carried_columns) if name in base_nodes.columns] + merge_base = cast(DataFrameT, base_nodes.drop(columns=hidden_columns)) if hidden_columns else base_nodes + node_rows = cast(DataFrameT, safe_merge(merge_base, carry_payload, on=id_column, how="left")) + + dispatch_graph = base_graph.bind() + dispatch_graph._nodes = node_rows + edges_df = getattr(base_graph, "_edges", None) + if edges_df is not None: + dispatch_graph._edges = edges_df + return dispatch_graph, ordered_reentry_start_nodes( + node_rows=node_rows, + carried_node_ids=carried_node_ids, + id_column=id_column, + ) + + +def union_scalar_reentry_results( + row_results: List[Plottable], + *, + base_graph: Plottable, + engine: Union[EngineAbstract, str], +) -> Plottable: + """Union per-row suffix results from a multi-row scalar prefix (#1047).""" + node_frames = [] + for r in row_results: + nodes = getattr(r, "_nodes", None) + if nodes is not None and len(cast(Any, nodes)) > 0: + node_frames.append(nodes) + result = base_graph.bind() + if node_frames: + concrete_engine = resolve_engine(cast(Any, engine), node_frames[0]) + concat = df_concat(concrete_engine) + result._nodes = cast(DataFrameT, concat(node_frames, ignore_index=True)) + else: + base_nodes = getattr(base_graph, "_nodes", None) + result._nodes = cast(DataFrameT, base_nodes.iloc[0:0]) if base_nodes is not None else None + result._edges = getattr(base_graph, "_edges", None) + return result + + +def compiled_query_scalar_reentry_state( + base_graph: Plottable, + prefix_result: Plottable, + *, + carried_columns: Sequence[str], + engine: Union[EngineAbstract, str], + row_index: int = 0, +) -> Tuple[Plottable, Optional[DataFrameT]]: + prefix_rows = getattr(prefix_result, "_nodes", None) + if prefix_rows is None: + raise reentry_validation_error( + "Cypher MATCH after WITH scalar-only prefix stages could not recover prefix rows", + value=None, + suggestion="Project scalar columns directly before MATCH re-entry.", + ) + prefix_row_count = len(prefix_rows) + base_nodes = getattr(base_graph, "_nodes", None) + if prefix_row_count == 0: + if base_nodes is None: + return base_graph, None + dispatch_graph = base_graph.bind() + dispatch_graph._nodes = cast(DataFrameT, base_nodes.iloc[0:0]) + edges_df = getattr(base_graph, "_edges", None) + if edges_df is not None: + dispatch_graph._edges = cast(DataFrameT, edges_df.iloc[0:0]) + return dispatch_graph, None + if base_nodes is None: + raise reentry_validation_error( + "Cypher MATCH after WITH scalar-only prefix stages could not recover the base node table for re-entry", + value=None, + suggestion="Retry with a node-backed graph before MATCH re-entry.", + ) + if not carried_columns: + # Scalar-only prefix with zero carried scalars: keep the full node table. + # Row fan-out/union for multi-row prefixes happens in the caller. + dispatch_graph = base_graph.bind() + dispatch_graph._nodes = base_nodes + edges_df = getattr(base_graph, "_edges", None) + if edges_df is not None: + dispatch_graph._edges = edges_df + return dispatch_graph, None + missing_column = next((name for name in carried_columns if name not in prefix_rows.columns), None) + if missing_column is not None: + raise reentry_validation_error( + "Cypher MATCH after WITH scalar-only prefix stages could not recover a carried scalar column from the prefix stage", + value=missing_column, + suggestion="Project the scalar column explicitly before MATCH re-entry.", + ) + row = prefix_rows.iloc[row_index] + node_rows = cast( + DataFrameT, + base_nodes.assign( + **{ + _reentry_hidden_column_name(output_name): row[output_name] + for output_name in carried_columns + } + ), + ) + dispatch_graph = base_graph.bind() + dispatch_graph._nodes = node_rows + edges_df = getattr(base_graph, "_edges", None) + if edges_df is not None: + dispatch_graph._edges = edges_df + return dispatch_graph, None + + +def freeform_broadcast_row_to_nodes( + base_graph: Plottable, + base_nodes: DataFrameT, + prefix_rows: DataFrameT, + plan: ReentryPlan, + *, + row_index: int, +) -> Plottable: + """Build a dispatch graph for a single free-form prefix row. + + Broadcasts that row's carried hidden columns onto every base node so the + trailing MATCH (running global, with `start_nodes=None`) inherits the + carried values via the row pipeline. Used for both single-prefix-row and + multi-prefix-row (#1285) free-form lanes. + """ + row = prefix_rows.iloc[row_index] + broadcast_values: Dict[str, Any] = {} + # Top-level scalar carries (e.g. ``WITH a, b.id AS bid``): the prefix row + # exposes them under their output names; the runtime hidden column on the + # base node table is keyed by ``_reentry_hidden_column_name``. + for col in plan.scalar_columns: + if col in prefix_rows.columns: + broadcast_values[_reentry_hidden_column_name(col)] = row[col] + # Non-source whole-row property carries (slice 4.3b from #1248): the prefix + # row already exposes these under their `__cypher_reentry_*` names; copy + # them across as-is. + for col in prefix_rows.columns: + if isinstance(col, str) and col.startswith("__cypher_reentry_"): + broadcast_values[col] = row[col] + + if broadcast_values: + existing_hidden = [ + c for c in base_nodes.columns + if isinstance(c, str) and c.startswith("__cypher_reentry_") + ] + node_rows = ( + cast(DataFrameT, base_nodes.drop(columns=existing_hidden)) + if existing_hidden + else base_nodes + ) + node_rows = cast(DataFrameT, node_rows.assign(**broadcast_values)) + else: + node_rows = cast(DataFrameT, base_nodes) + + dispatch_graph = base_graph.bind() + dispatch_graph._nodes = node_rows + edges_df = getattr(base_graph, "_edges", None) + if edges_df is not None: + dispatch_graph._edges = edges_df + return dispatch_graph + + +def compiled_query_freeform_reentry_state( + base_graph: Plottable, + compiled_query: CompiledCypherQuery, + prefix_result: Plottable, + *, + engine: Union[EngineAbstract, str], +) -> Tuple[Plottable, Optional[DataFrameT]]: + """#1263 free-form intermediate MATCH (LDBC SNB IC3 endpoint), single-row. + + The trailing MATCH binds aliases that are NOT in the prefix WITH's carried + whole-row set, so it must run against the full base graph (no carried-id + seed filter). Carried hidden columns from the prefix row are broadcast + onto every base node so the row pipeline carries them through whichever + alias the trailing MATCH binds; downstream WHERE/RETURN expressions + referencing carried-alias properties resolve through those broadcast + columns. + + Single-prefix-row dispatch only. Multi-prefix-row free-form (#1285) is + handled at the caller via a per-row union loop (mirror of the scalar-only + multi-row pattern at ``_execute_compiled_query_with_reentry``). + """ + prefix_rows = getattr(prefix_result, "_nodes", None) + base_nodes = getattr(base_graph, "_nodes", None) + if base_nodes is None: + raise reentry_validation_error( + "Cypher MATCH after WITH (free-form intermediate MATCH; #1263) " + "could not recover the base node table for re-entry", + value=None, + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + if prefix_rows is None or len(prefix_rows) == 0: + # Empty prefix → empty result. Return a graph with empty nodes/edges + # so the suffix produces no rows. + dispatch_graph = base_graph.bind() + dispatch_graph._nodes = cast(DataFrameT, base_nodes.iloc[0:0]) + edges_df = getattr(base_graph, "_edges", None) + if edges_df is not None: + dispatch_graph._edges = cast(DataFrameT, edges_df.iloc[0:0]) + return dispatch_graph, None + # Single-row dispatch only; the caller routes multi-row through the + # per-row union loop in ``_execute_compiled_query_with_reentry``. + if len(prefix_rows) > 1: + raise reentry_validation_error( + "Cypher MATCH after WITH (free-form intermediate MATCH) single-row " + "dispatcher invoked with a multi-row prefix; the caller should " + "route multi-row free-form through the per-row union loop.", + value=len(prefix_rows), + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + + plan = compiled_query.reentry_plan + if plan is None: + # Defensive: caller already gated on plan.free_form, so reaching here + # without a plan is a programmer error. + raise reentry_validation_error( + "Cypher free-form intermediate MATCH dispatched without a ReentryPlan", + value=None, + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + + dispatch_graph = freeform_broadcast_row_to_nodes( + base_graph, cast(DataFrameT, base_nodes), cast(DataFrameT, prefix_rows), plan, row_index=0, + ) + return dispatch_graph, None + + +def aligned_reentry_rows( + *, + ids: SeriesT, + prefix_rows: Optional[DataFrameT], + output_name: Optional[str], +) -> Tuple[SeriesT, Optional[DataFrameT]]: + if prefix_rows is not None and len(prefix_rows) != len(ids): + raise reentry_validation_error( + "Cypher MATCH after WITH metadata row counts disagreed with prefix rows during re-entry", + value=output_name, + suggestion="Retry with a direct whole-row carry through WITH or inspect intermediate row-shaping before MATCH re-entry.", + ) + if not hasattr(ids, "notna"): + raise reentry_validation_error( + "Cypher MATCH after WITH could not align carried node identities from the prefix stage", + value=output_name, + suggestion=REENTRY_WHOLE_ROW_SUGGESTION, + ) + + non_null_mask = cast(SeriesT, ids.notna()) + carried_ids = cast(SeriesT, ids[non_null_mask].reset_index(drop=True)) + if prefix_rows is None: + return carried_ids, None + return carried_ids, cast(DataFrameT, prefix_rows.loc[non_null_mask].reset_index(drop=True)) + + +def reentry_carry_payload( + *, + carried_node_ids: DataFrameT, + prefix_rows: DataFrameT, + carried_columns: Sequence[str], +) -> DataFrameT: + missing_column = next((name for name in carried_columns if name not in prefix_rows.columns), None) + if missing_column is not None: + raise reentry_validation_error( + "Cypher MATCH after WITH could not recover a carried scalar column from the prefix stage", + value=missing_column, + suggestion="Project the scalar column explicitly before MATCH re-entry.", + ) + return cast( + DataFrameT, + carried_node_ids.assign( + **{ + _reentry_hidden_column_name(output_name): cast(SeriesT, prefix_rows[output_name]).reset_index(drop=True) + for output_name in carried_columns + } + ), + ) + + +def ordered_reentry_start_nodes( + *, + node_rows: DataFrameT, + carried_node_ids: DataFrameT, + id_column: str, +) -> DataFrameT: + # MATCH re-entry must preserve the WITH row order, not the base node-table order. + return cast(DataFrameT, safe_merge(carried_node_ids, node_rows, on=id_column, how="left")) diff --git a/graphistry/compute/gfql/cypher/result_postprocess.py b/graphistry/compute/gfql/cypher/result_postprocess.py index d8babf56db..555ef343c7 100644 --- a/graphistry/compute/gfql/cypher/result_postprocess.py +++ b/graphistry/compute/gfql/cypher/result_postprocess.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Literal, Optional, Sequence, TypedDict, cast +from typing import Any, Dict, Literal, Optional, Sequence, TypedDict, cast import pandas as pd @@ -33,6 +33,40 @@ class WholeRowProjectionMeta(TypedDict): ids: SeriesT +def entity_projection_meta_entry( + result: Plottable, + *, + output_name: str, + field: str, + message: str, + suggestion: str, +) -> WholeRowProjectionMeta: + """Look up a whole-row projection-meta entry on a Plottable result. + + Raises a Cypher-language ``GFQLValidationError`` (E108) when the + side-channel metadata is missing or does not contain ``output_name``. + Shared by the connected-OPTIONAL-MATCH and bounded-reentry execution + paths in ``gfql_unified``. + """ + from graphistry.compute.exceptions import ErrorCode, GFQLValidationError + + entity_meta = cast( + Optional[Dict[str, WholeRowProjectionMeta]], + getattr(result, "_cypher_entity_projection_meta", None), + ) + if not isinstance(entity_meta, dict) or output_name not in entity_meta: + raise GFQLValidationError( + ErrorCode.E108, + message, + field=field, + value=output_name, + suggestion=suggestion, + language="cypher", + ) + return entity_meta[output_name] + + + def _object_text(series: SeriesT) -> SeriesT: out = cast(SeriesT, series) if hasattr(out, "astype"): diff --git a/graphistry/compute/gfql_unified.py b/graphistry/compute/gfql_unified.py index 9e77d8593a..148cf35beb 100644 --- a/graphistry/compute/gfql_unified.py +++ b/graphistry/compute/gfql_unified.py @@ -4,7 +4,7 @@ from dataclasses import replace from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Tuple, Union, cast from graphistry.Plottable import Plottable -from graphistry.Engine import Engine, EngineAbstract, df_concat, df_cons, resolve_engine, safe_merge +from graphistry.Engine import Engine, EngineAbstract, df_concat, df_cons, resolve_engine from graphistry.util import setup_logger from .ast import ASTObject, ASTLet, ASTNode, ASTEdge, ASTCall from .chain import Chain, chain as chain_impl @@ -31,11 +31,22 @@ CompiledCypherQuery, CompiledCypherUnionQuery, ConnectedOptionalMatchPlan, - _reentry_hidden_column_name, ) -from graphistry.compute.gfql.cypher.reentry_plan import ReentryPlan +from graphistry.compute.gfql.cypher.reentry.execution import ( + REENTRY_WHOLE_ROW_SUGGESTION as _REENTRY_WHOLE_ROW_SUGGESTION, + apply_optional_reentry_null_fill as _apply_optional_reentry_null_fill, + compiled_query_freeform_reentry_state as _compiled_query_freeform_reentry_state, + compiled_query_reentry_state as _compiled_query_reentry_state, + compiled_query_scalar_reentry_state as _compiled_query_scalar_reentry_state, + freeform_broadcast_row_to_nodes as _freeform_broadcast_row_to_nodes, + reentry_validation_error as _reentry_validation_error, + union_scalar_reentry_results as _union_scalar_reentry_results, +) from graphistry.compute.gfql.cypher.call_procedures import execute_cypher_call -from graphistry.compute.gfql.cypher.result_postprocess import WholeRowProjectionMeta, apply_result_projection +from graphistry.compute.gfql.cypher.result_postprocess import ( + apply_result_projection, + entity_projection_meta_entry as _entity_projection_meta_entry, +) from graphistry.compute.gfql.df_executor import ( DFSamePathExecutor, build_same_path_inputs, @@ -51,7 +62,7 @@ ) from graphistry.compute.gfql.passes import DEFAULT_LOGICAL_PASSES, DEFAULT_TIER2_PASSES, PassManager from graphistry.compute.gfql.row.pipeline import is_row_pipeline_call -from graphistry.compute.typing import DataFrameT, SeriesT +from graphistry.compute.typing import DataFrameT from graphistry.compute.util.generate_safe_column_name import generate_safe_column_name from graphistry.compute.validate.validate_schema import validate_chain_schema from graphistry.compute.gfql_validate import gfql_validate as gfql_preflight_validate @@ -59,8 +70,6 @@ logger = setup_logger(__name__) -_REENTRY_WHOLE_ROW_SUGGESTION = "Carry a whole-row node alias through WITH before MATCH re-entry." -_REENTRY_SCALAR_SUGGESTION = "Carry scalar columns through WITH before MATCH re-entry." def _series_to_pylist(values: Any) -> List[Any]: if hasattr(values, "to_arrow"): @@ -100,47 +109,6 @@ def _apply_empty_result_row( return out -def _entity_projection_meta_entry( - result: Plottable, - *, - output_name: str, - field: str, - message: str, - suggestion: str, -) -> WholeRowProjectionMeta: - entity_meta = cast( - Optional[Dict[str, WholeRowProjectionMeta]], - getattr(result, "_cypher_entity_projection_meta", None), - ) - if not isinstance(entity_meta, dict) or output_name not in entity_meta: - raise GFQLValidationError( - ErrorCode.E108, - message, - field=field, - value=output_name, - suggestion=suggestion, - language="cypher", - ) - return entity_meta[output_name] - - -def _reentry_validation_error( - message: str, - *, - value: Any, - suggestion: str, - field: str = "with", -) -> GFQLValidationError: - return GFQLValidationError( - ErrorCode.E108, - message, - field=field, - value=value, - suggestion=suggestion, - language="cypher", - ) - - def _apply_optional_null_fill( result: Plottable, *, @@ -1046,417 +1014,6 @@ def _execute_compiled_query_with_reentry( return result -def _apply_optional_reentry_null_fill( - result: Plottable, - *, - prefix_result: Plottable, - engine: Union[EngineAbstract, str], - empty_result_row: Optional[Dict[str, Any]] = None, -) -> Plottable: - """Null-fill result rows for prefix rows that the optional reentry didn't match.""" - prefix_df = getattr(prefix_result, "_nodes", None) - result_df = getattr(result, "_nodes", None) - - if prefix_df is None or len(prefix_df) == 0: - return result - - prefix_rows = len(prefix_df) - result_rows = 0 if result_df is None else len(result_df) - - if result_rows >= prefix_rows: - return result - - concrete_engine = resolve_engine(cast(Any, engine), result) - df_ctor = df_cons(concrete_engine) - concat = df_concat(concrete_engine) - - # Use the compiled empty_result_row template (correct projected column names) - # or fall back to the result's own columns. - if empty_result_row is not None: - null_row = dict(empty_result_row) - elif result_df is not None and len(result_df.columns) > 0: - null_row = {col: None for col in result_df.columns} - else: - null_row = {} - - if result_df is None or len(result_df) == 0: - if null_row: - out = result.bind() - out._nodes = df_ctor([dict(null_row) for _ in range(prefix_rows)]) - return out - return result - - missing_count = prefix_rows - result_rows - fill_rows = [dict(null_row) for _ in range(missing_count)] - - fill_df = df_ctor(fill_rows) - out = result.bind() - out._nodes = concat([result_df, fill_df], ignore_index=True, sort=False) - edges_df = getattr(result, "_edges", None) - if edges_df is not None: - out._edges = edges_df[:0] - return out - - -def _compiled_query_reentry_state( - base_graph: Plottable, - plan: ReentryPlan, - prefix_result: Plottable, - *, - engine: Union[EngineAbstract, str], -) -> Tuple[Plottable, DataFrameT]: - if plan.scalar_only or plan.free_form: - raise _reentry_validation_error( - "Cypher whole-row reentry dispatcher received a non-whole-row ReentryPlan", - value=plan.reentry_alias_name, - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - output_name = plan.reentry_alias_name - carried_columns = tuple(plan.scalar_columns) - meta = _entity_projection_meta_entry( - prefix_result, - output_name=output_name, - field="with", - message="Cypher MATCH after WITH could not recover carried node identities from the prefix stage", - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - if meta["table"] != "nodes": - raise _reentry_validation_error( - "Cypher MATCH after WITH currently supports node re-entry only", - value=output_name, - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - ids = meta["ids"] - id_column = meta["id_column"] - if not hasattr(ids, "dropna"): - raise _reentry_validation_error( - "Cypher MATCH after WITH could not recover carried node identities from the prefix stage", - value=output_name, - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - base_nodes = getattr(base_graph, "_nodes", None) - if base_nodes is None or id_column not in base_nodes.columns: - raise _reentry_validation_error( - "Cypher MATCH after WITH could not recover the base node table for re-entry", - value=id_column, - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - concrete_engine = resolve_engine(cast(Any, engine), base_graph) - carried_ids, aligned_prefix_rows = _aligned_reentry_rows( - ids=cast(SeriesT, ids), - prefix_rows=getattr(prefix_result, "_nodes", None), - output_name=output_name, - ) - carried_node_ids = cast(DataFrameT, df_cons(concrete_engine)({id_column: carried_ids})) - if not carried_columns: - return base_graph, _ordered_reentry_start_nodes( - node_rows=base_nodes, - carried_node_ids=carried_node_ids, - id_column=id_column, - ) - if aligned_prefix_rows is None: - raise _reentry_validation_error( - "Cypher MATCH after WITH could not recover carried row columns from the prefix stage", - value=output_name, - suggestion=_REENTRY_SCALAR_SUGGESTION, - ) - duplicate_mask = carried_ids.duplicated() - if bool(duplicate_mask.any()) if hasattr(duplicate_mask, "any") else False: - raise _reentry_validation_error( - "Cypher MATCH after WITH carried scalar columns currently require unique carried node rows", - value=output_name, - suggestion="Use a single-node seed WITH shape, or avoid carrying scalar columns into MATCH re-entry.", - ) - - carry_payload = _reentry_carry_payload( - carried_node_ids=carried_node_ids, - prefix_rows=aligned_prefix_rows, - carried_columns=carried_columns, - ) - hidden_columns = [name for name in map(_reentry_hidden_column_name, carried_columns) if name in base_nodes.columns] - merge_base = cast(DataFrameT, base_nodes.drop(columns=hidden_columns)) if hidden_columns else base_nodes - node_rows = cast(DataFrameT, safe_merge(merge_base, carry_payload, on=id_column, how="left")) - - dispatch_graph = base_graph.bind() - dispatch_graph._nodes = node_rows - edges_df = getattr(base_graph, "_edges", None) - if edges_df is not None: - dispatch_graph._edges = edges_df - return dispatch_graph, _ordered_reentry_start_nodes( - node_rows=node_rows, - carried_node_ids=carried_node_ids, - id_column=id_column, - ) - - -def _union_scalar_reentry_results( - row_results: List[Plottable], - *, - base_graph: Plottable, - engine: Union[EngineAbstract, str], -) -> Plottable: - """Union per-row suffix results from a multi-row scalar prefix (#1047).""" - node_frames = [] - for r in row_results: - nodes = getattr(r, "_nodes", None) - if nodes is not None and len(cast(Any, nodes)) > 0: - node_frames.append(nodes) - result = base_graph.bind() - if node_frames: - concrete_engine = resolve_engine(cast(Any, engine), node_frames[0]) - concat = df_concat(concrete_engine) - result._nodes = cast(DataFrameT, concat(node_frames, ignore_index=True)) - else: - base_nodes = getattr(base_graph, "_nodes", None) - result._nodes = cast(DataFrameT, base_nodes.iloc[0:0]) if base_nodes is not None else None - result._edges = getattr(base_graph, "_edges", None) - return result - - -def _compiled_query_scalar_reentry_state( - base_graph: Plottable, - prefix_result: Plottable, - *, - carried_columns: Sequence[str], - engine: Union[EngineAbstract, str], - row_index: int = 0, -) -> Tuple[Plottable, Optional[DataFrameT]]: - prefix_rows = getattr(prefix_result, "_nodes", None) - if prefix_rows is None: - raise _reentry_validation_error( - "Cypher MATCH after WITH scalar-only prefix stages could not recover prefix rows", - value=None, - suggestion="Project scalar columns directly before MATCH re-entry.", - ) - prefix_row_count = len(prefix_rows) - base_nodes = getattr(base_graph, "_nodes", None) - if prefix_row_count == 0: - if base_nodes is None: - return base_graph, None - dispatch_graph = base_graph.bind() - dispatch_graph._nodes = cast(DataFrameT, base_nodes.iloc[0:0]) - edges_df = getattr(base_graph, "_edges", None) - if edges_df is not None: - dispatch_graph._edges = cast(DataFrameT, edges_df.iloc[0:0]) - return dispatch_graph, None - if base_nodes is None: - raise _reentry_validation_error( - "Cypher MATCH after WITH scalar-only prefix stages could not recover the base node table for re-entry", - value=None, - suggestion="Retry with a node-backed graph before MATCH re-entry.", - ) - if not carried_columns: - # Scalar-only prefix with zero carried scalars: keep the full node table. - # Row fan-out/union for multi-row prefixes happens in the caller. - dispatch_graph = base_graph.bind() - dispatch_graph._nodes = base_nodes - edges_df = getattr(base_graph, "_edges", None) - if edges_df is not None: - dispatch_graph._edges = edges_df - return dispatch_graph, None - missing_column = next((name for name in carried_columns if name not in prefix_rows.columns), None) - if missing_column is not None: - raise _reentry_validation_error( - "Cypher MATCH after WITH scalar-only prefix stages could not recover a carried scalar column from the prefix stage", - value=missing_column, - suggestion="Project the scalar column explicitly before MATCH re-entry.", - ) - row = prefix_rows.iloc[row_index] - node_rows = cast( - DataFrameT, - base_nodes.assign( - **{ - _reentry_hidden_column_name(output_name): row[output_name] - for output_name in carried_columns - } - ), - ) - dispatch_graph = base_graph.bind() - dispatch_graph._nodes = node_rows - edges_df = getattr(base_graph, "_edges", None) - if edges_df is not None: - dispatch_graph._edges = edges_df - return dispatch_graph, None - - -def _freeform_broadcast_row_to_nodes( - base_graph: Plottable, - base_nodes: DataFrameT, - prefix_rows: DataFrameT, - plan: ReentryPlan, - *, - row_index: int, -) -> Plottable: - """Build a dispatch graph for a single free-form prefix row. - - Broadcasts that row's carried hidden columns onto every base node so the - trailing MATCH (running global, with `start_nodes=None`) inherits the - carried values via the row pipeline. Used for both single-prefix-row and - multi-prefix-row (#1285) free-form lanes. - """ - row = prefix_rows.iloc[row_index] - broadcast_values: Dict[str, Any] = {} - # Top-level scalar carries (e.g. ``WITH a, b.id AS bid``): the prefix row - # exposes them under their output names; the runtime hidden column on the - # base node table is keyed by ``_reentry_hidden_column_name``. - for col in plan.scalar_columns: - if col in prefix_rows.columns: - broadcast_values[_reentry_hidden_column_name(col)] = row[col] - # Non-source whole-row property carries (slice 4.3b from #1248): the prefix - # row already exposes these under their `__cypher_reentry_*` names; copy - # them across as-is. - for col in prefix_rows.columns: - if isinstance(col, str) and col.startswith("__cypher_reentry_"): - broadcast_values[col] = row[col] - - if broadcast_values: - existing_hidden = [ - c for c in base_nodes.columns - if isinstance(c, str) and c.startswith("__cypher_reentry_") - ] - node_rows = ( - cast(DataFrameT, base_nodes.drop(columns=existing_hidden)) - if existing_hidden - else base_nodes - ) - node_rows = cast(DataFrameT, node_rows.assign(**broadcast_values)) - else: - node_rows = cast(DataFrameT, base_nodes) - - dispatch_graph = base_graph.bind() - dispatch_graph._nodes = node_rows - edges_df = getattr(base_graph, "_edges", None) - if edges_df is not None: - dispatch_graph._edges = edges_df - return dispatch_graph - - -def _compiled_query_freeform_reentry_state( - base_graph: Plottable, - compiled_query: CompiledCypherQuery, - prefix_result: Plottable, - *, - engine: Union[EngineAbstract, str], -) -> Tuple[Plottable, Optional[DataFrameT]]: - """#1263 free-form intermediate MATCH (LDBC SNB IC3 endpoint), single-row. - - The trailing MATCH binds aliases that are NOT in the prefix WITH's carried - whole-row set, so it must run against the full base graph (no carried-id - seed filter). Carried hidden columns from the prefix row are broadcast - onto every base node so the row pipeline carries them through whichever - alias the trailing MATCH binds; downstream WHERE/RETURN expressions - referencing carried-alias properties resolve through those broadcast - columns. - - Single-prefix-row dispatch only. Multi-prefix-row free-form (#1285) is - handled at the caller via a per-row union loop (mirror of the scalar-only - multi-row pattern at ``_execute_compiled_query_with_reentry``). - """ - prefix_rows = getattr(prefix_result, "_nodes", None) - base_nodes = getattr(base_graph, "_nodes", None) - if base_nodes is None: - raise _reentry_validation_error( - "Cypher MATCH after WITH (free-form intermediate MATCH; #1263) " - "could not recover the base node table for re-entry", - value=None, - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - if prefix_rows is None or len(prefix_rows) == 0: - # Empty prefix → empty result. Return a graph with empty nodes/edges - # so the suffix produces no rows. - dispatch_graph = base_graph.bind() - dispatch_graph._nodes = cast(DataFrameT, base_nodes.iloc[0:0]) - edges_df = getattr(base_graph, "_edges", None) - if edges_df is not None: - dispatch_graph._edges = cast(DataFrameT, edges_df.iloc[0:0]) - return dispatch_graph, None - # Single-row dispatch only; the caller routes multi-row through the - # per-row union loop in ``_execute_compiled_query_with_reentry``. - if len(prefix_rows) > 1: - raise _reentry_validation_error( - "Cypher MATCH after WITH (free-form intermediate MATCH) single-row " - "dispatcher invoked with a multi-row prefix; the caller should " - "route multi-row free-form through the per-row union loop.", - value=len(prefix_rows), - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - - plan = compiled_query.reentry_plan - if plan is None: - # Defensive: caller already gated on plan.free_form, so reaching here - # without a plan is a programmer error. - raise _reentry_validation_error( - "Cypher free-form intermediate MATCH dispatched without a ReentryPlan", - value=None, - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - - dispatch_graph = _freeform_broadcast_row_to_nodes( - base_graph, cast(DataFrameT, base_nodes), cast(DataFrameT, prefix_rows), plan, row_index=0, - ) - return dispatch_graph, None - - -def _aligned_reentry_rows( - *, - ids: SeriesT, - prefix_rows: Optional[DataFrameT], - output_name: Optional[str], -) -> Tuple[SeriesT, Optional[DataFrameT]]: - if prefix_rows is not None and len(prefix_rows) != len(ids): - raise _reentry_validation_error( - "Cypher MATCH after WITH metadata row counts disagreed with prefix rows during re-entry", - value=output_name, - suggestion="Retry with a direct whole-row carry through WITH or inspect intermediate row-shaping before MATCH re-entry.", - ) - if not hasattr(ids, "notna"): - raise _reentry_validation_error( - "Cypher MATCH after WITH could not align carried node identities from the prefix stage", - value=output_name, - suggestion=_REENTRY_WHOLE_ROW_SUGGESTION, - ) - - non_null_mask = cast(SeriesT, ids.notna()) - carried_ids = cast(SeriesT, ids[non_null_mask].reset_index(drop=True)) - if prefix_rows is None: - return carried_ids, None - return carried_ids, cast(DataFrameT, prefix_rows.loc[non_null_mask].reset_index(drop=True)) - - -def _reentry_carry_payload( - *, - carried_node_ids: DataFrameT, - prefix_rows: DataFrameT, - carried_columns: Sequence[str], -) -> DataFrameT: - missing_column = next((name for name in carried_columns if name not in prefix_rows.columns), None) - if missing_column is not None: - raise _reentry_validation_error( - "Cypher MATCH after WITH could not recover a carried scalar column from the prefix stage", - value=missing_column, - suggestion="Project the scalar column explicitly before MATCH re-entry.", - ) - return cast( - DataFrameT, - carried_node_ids.assign( - **{ - _reentry_hidden_column_name(output_name): cast(SeriesT, prefix_rows[output_name]).reset_index(drop=True) - for output_name in carried_columns - } - ), - ) - - -def _ordered_reentry_start_nodes( - *, - node_rows: DataFrameT, - carried_node_ids: DataFrameT, - id_column: str, -) -> DataFrameT: - # MATCH re-entry must preserve the WITH row order, not the base node-table order. - return cast(DataFrameT, safe_merge(carried_node_ids, node_rows, on=id_column, how="left")) - - def _materialize_split_alias_columns( result: Plottable, executor: DFSamePathExecutor, diff --git a/graphistry/tests/compute/gfql/cypher/test_lowering.py b/graphistry/tests/compute/gfql/cypher/test_lowering.py index 4f57ebcb86..75a79f363a 100644 --- a/graphistry/tests/compute/gfql/cypher/test_lowering.py +++ b/graphistry/tests/compute/gfql/cypher/test_lowering.py @@ -1738,6 +1738,48 @@ def test_lower_match_query_emits_row_anti_semi_filter_for_bound_alias_negated_wh assert [op.get("type") for op in binding_ops] == ["Node", "Edge", "Node"] +def test_lower_match_query_emits_row_anti_semi_filter_for_bound_alias_negated_bounded_varlen_where_pattern() -> None: + lowered = lower_match_query( + _parse_query("MATCH (a)-[:R]->(b) WHERE NOT (b)-[:R*1..2]->(a) RETURN a.id AS a_id, b.id AS b_id") + ) + + assert len(lowered.row_pre_filters) == 1 + anti = lowered.row_pre_filters[0] + assert isinstance(anti, ASTCall) + assert anti.function == "anti_semi_apply" + assert anti.params.get("join_aliases") == ["b", "a"] + binding_ops = anti.params.get("binding_ops") + assert isinstance(binding_ops, list) + assert [op.get("type") for op in binding_ops] == ["Node", "Edge", "Node"] + edge = binding_ops[1] + assert edge.get("min_hops") == 1 + assert edge.get("max_hops") == 2 + assert edge.get("to_fixed_point") is False + + +def test_lower_match_query_emits_row_marker_for_xor_wrapped_bounded_varlen_where_pattern() -> None: + lowered = lower_match_query( + _parse_query("MATCH (n) WHERE (n)-[:R*2]->() XOR n.id = 'd' RETURN n.id AS id") + ) + + assert len(lowered.row_pre_filters) == 1 + marker = lowered.row_pre_filters[0] + assert isinstance(marker, ASTCall) + assert marker.function == "semi_apply_mark" + assert marker.params.get("join_aliases") == ["n"] + out_col = marker.params.get("out_col") + assert isinstance(out_col, str) and out_col.startswith("__gfql_where_pattern_") + assert lowered.row_where is not None + assert " XOR " in lowered.row_where.text + assert out_col in lowered.row_where.text + binding_ops = marker.params.get("binding_ops") + assert isinstance(binding_ops, list) + edge = binding_ops[1] + assert edge.get("min_hops") == 2 + assert edge.get("max_hops") == 2 + assert edge.get("to_fixed_point") is False + + def test_lower_match_query_rejects_where_pattern_predicate_introducing_new_aliases() -> None: with pytest.raises(GFQLValidationError, match="cannot introduce new aliases"): lower_cypher_query(_parse_query("MATCH (n) WHERE (n)-[r]->(a) RETURN n")) @@ -5312,22 +5354,119 @@ def test_connected_variable_length_typed_mixed() -> None: @pytest.mark.parametrize( - "query", + "query,expected_rows", [ - "MATCH (n) WHERE (n)-[:REL1*2]-() RETURN n", - "MATCH (n) WHERE (n)-[*2]-() RETURN n", - "MATCH (n) WHERE (n)<-[:REL1*1..2]-() RETURN n", - "MATCH (n) WHERE (n)-[:REL1*2]-() AND n.id <> 'a' RETURN n", + ( + "MATCH (n) WHERE (n)-[:REL1*2]->() RETURN n.id AS id ORDER BY id", + [{"id": "a"}, {"id": "b"}, {"id": "c"}], + ), + ( + "MATCH (n) WHERE (n)-[*2]->() RETURN n.id AS id ORDER BY id", + [{"id": "a"}, {"id": "b"}, {"id": "c"}], + ), + ( + "MATCH (n) WHERE (n)<-[:REL1*1..2]-() RETURN n.id AS id ORDER BY id", + [{"id": "b"}, {"id": "c"}, {"id": "d"}], + ), + ( + "MATCH (n) WHERE (n)-[:REL1*2]->() AND n.id <> 'a' RETURN n.id AS id ORDER BY id", + [{"id": "b"}, {"id": "c"}], + ), ], ) -def test_string_cypher_failfast_rejects_bounded_variable_length_where_pattern_predicates(query: str) -> None: - graph = _mk_empty_graph() +def test_string_cypher_executes_bounded_variable_length_where_pattern_predicates( + query: str, + expected_rows: list[dict[str, object]], +) -> None: + graph = _mk_graph( + pd.DataFrame({"id": ["a", "b", "c", "d"]}), + pd.DataFrame( + { + "s": ["a", "b", "c"], + "d": ["b", "c", "d"], + "type": ["REL1", "REL1", "REL1"], + } + ), + ) - with pytest.raises(GFQLValidationError) as exc_info: - graph.gfql(query) + result = graph.gfql(query) + assert result._nodes.to_dict(orient="records") == expected_rows - assert exc_info.value.code == ErrorCode.E108 - assert "WHERE pattern predicates" in exc_info.value.message + +@pytest.mark.parametrize( + "query,expected_rows", + [ + ( + "MATCH (n) WHERE (n)-[:REL1*2]->() OR n.id = 'd' RETURN n.id AS id ORDER BY id", + [{"id": "a"}, {"id": "b"}, {"id": "d"}], + ), + ( + "MATCH (n) WHERE (n)-[:REL1*2]->() XOR n.id = 'd' RETURN n.id AS id ORDER BY id", + [{"id": "a"}, {"id": "b"}, {"id": "d"}], + ), + ( + "MATCH (n) WHERE NOT (n)-[:REL1*2]->() RETURN n.id AS id ORDER BY id", + [{"id": "c"}, {"id": "d"}], + ), + ], +) +def test_string_cypher_executes_bounded_variable_length_where_pattern_boolean_wrappers( + query: str, + expected_rows: list[dict[str, object]], +) -> None: + graph = _mk_graph( + pd.DataFrame({"id": ["a", "b", "c", "d"]}), + pd.DataFrame( + { + "s": ["a", "b", "c"], + "d": ["b", "c", "d"], + "type": ["REL1", "REL1", "REL1"], + } + ), + ) + + result = graph.gfql(query) + assert result._nodes.to_dict(orient="records") == expected_rows + + +def test_string_cypher_executes_conjoined_bounded_varlen_where_predicates_across_edge_types() -> None: + graph = _mk_graph( + pd.DataFrame({"id": ["a", "b", "c", "d", "e"]}), + pd.DataFrame( + { + "s": ["a", "b", "c", "a", "b"], + "d": ["b", "c", "d", "e", "e"], + "type": ["REL1", "REL1", "REL1", "REL2", "REL2"], + } + ), + ) + + rows_forward = graph.gfql( + "MATCH (n) WHERE (n)-[:REL1*2]->() AND (n)-[:REL2*1]->() RETURN n.id AS id ORDER BY id" + )._nodes.to_dict(orient="records") + assert rows_forward == [{"id": "b"}] + + +def test_string_cypher_executes_xor_between_bounded_reverse_and_forward_where_patterns() -> None: + graph = _mk_graph( + pd.DataFrame({"id": ["a", "b", "c", "d"]}), + pd.DataFrame( + { + "s": ["a", "b", "c"], + "d": ["b", "c", "d"], + "type": ["REL1", "REL1", "REL1"], + } + ), + ) + + result = graph.gfql( + "MATCH (n) WHERE (n)<-[:REL1*1..2]-() XOR (n)-[:REL1*2]->() RETURN n.id AS id ORDER BY id" + ) + assert result._nodes.to_dict(orient="records") == [ + {"id": "a"}, + {"id": "c"}, + {"id": "d"}, + ]