From 555d4a6db5f8a01b45987523128d8026cd490e87 Mon Sep 17 00:00:00 2001 From: Alena Rybakina Date: Sat, 13 Jun 2026 15:46:16 +0300 Subject: [PATCH 1/2] orca: fall back for replicated CTE consumed in correlated scalar subqueries Root cause is a blind spot in the #375 slice walker: CollectCTESlices delimits slices only at Motion nodes. A CTE over a DISTRIBUTED REPLICATED table referenced from correlated scalar subqueries is decorrelated by ORCA into CPhysicalCorrelated*NLJoin whose inner side becomes an executor SubPlan running in its own slice -- but there is no Motion at that boundary, so the walker placed the Consumer on the same slice as the Producer. The cross-slice check (prod->sliceId != cons->sliceId) never fired, no fallback happened, and the ShareInputScan writer hung forever in shareinput_writer_waitdone() waiting for DONE acks from reader slices that never run. Teach the walker that the inner (subquery) side of a correlated NL join is a slice boundary too, mirroring the Motion rule. The replicated Consumer in the SubPlan then gets a distinct slice id, the existing check fires, and ORCA falls back to the Postgres optimizer. --- .../gporca/libgpopt/src/base/CUtils.cpp | 33 ++++++++++++++++--- src/test/regress/expected/shared_scan.out | 32 ++++++++++++++++++ .../expected/shared_scan_optimizer.out | 32 ++++++++++++++++++ src/test/regress/sql/shared_scan.sql | 29 ++++++++++++++++ 4 files changed, 122 insertions(+), 4 deletions(-) diff --git a/src/backend/gporca/libgpopt/src/base/CUtils.cpp b/src/backend/gporca/libgpopt/src/base/CUtils.cpp index d114a639449..66fa9c1db56 100644 --- a/src/backend/gporca/libgpopt/src/base/CUtils.cpp +++ b/src/backend/gporca/libgpopt/src/base/CUtils.cpp @@ -922,10 +922,30 @@ struct SCTEInfo typedef CDynamicPtrArray > CTEInfoArray; +// True if the operator is a correlated NL join. Its inner side becomes +// an executor SubPlan that runs in its own slice, so a CTE Consumer +// there is cross-slice w.r.t. a Producer outside -- which can deadlock +// the ShareInputScan writer. We treat the inner side as a slice +// boundary so the check below catches it. +// +// These are all of ORCA's SubPlan-producing operators. Add new ones here. +static BOOL +FCorrelatedNLJoin(COperator *pop) +{ + COperator::EOperatorId eopid = pop->Eopid(); + return (COperator::EopPhysicalCorrelatedInnerNLJoin == eopid || + COperator::EopPhysicalCorrelatedLeftOuterNLJoin == eopid || + COperator::EopPhysicalCorrelatedLeftSemiNLJoin == eopid || + COperator::EopPhysicalCorrelatedInLeftSemiNLJoin == eopid || + COperator::EopPhysicalCorrelatedLeftAntiSemiNLJoin == eopid || + COperator::EopPhysicalCorrelatedNotInLeftAntiSemiNLJoin == eopid); +} + // Walk the physical tree, recording the slice id of every replicated // CTE Producer and every CTE Consumer. Slices are delimited by Motion -// nodes: each non-scalar child of a Motion lives in a fresh slice -- -// same motId-stack idea as in apply_shareinput_xslice. +// nodes and by the SubPlan (inner) side of correlated NL joins: each +// such non-scalar child lives in a fresh slice -- same motId-stack idea +// as in apply_shareinput_xslice. static void CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice, ULONG *pNextSlice, CTEInfoArray *prodInfos, @@ -958,6 +978,7 @@ CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice, } BOOL isMotion = CUtils::FPhysicalMotion(pop); + BOOL isCorrelatedNLJoin = FCorrelatedNLJoin(pop); for (ULONG ul = 0; ul < pexpr->Arity(); ul++) { @@ -971,9 +992,13 @@ CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice, } // Allocate a fresh slice id for each non-scalar child of a - // Motion; otherwise the child stays in the parent's slice. + // Motion, and for the inner (subquery) side of a correlated NL + // join -- which the executor materializes as a SubPlan running + // in its own slice. Otherwise the child stays in the parent's + // slice. (For a NL join child 0 is the outer relation and child + // 1 is the inner/subquery relation.) ULONG childSlice = curSlice; - if (isMotion) + if (isMotion || (isCorrelatedNLJoin && 1 == ul)) { (*pNextSlice)++; childSlice = *pNextSlice; diff --git a/src/test/regress/expected/shared_scan.out b/src/test/regress/expected/shared_scan.out index 1bb16e8a465..75d6f6413a6 100644 --- a/src/test/regress/expected/shared_scan.out +++ b/src/test/regress/expected/shared_scan.out @@ -278,6 +278,38 @@ WITH RESET statement_timeout; DROP TABLE ss_t1, ss_t2; +-- ORCA should also fall back when the replicated CTE is referenced from +-- *correlated* scalar subqueries. These become correlated NL joins whose +-- inner side runs as a SubPlan in its own slice. Counting only Motion +-- nodes misses this, so the ShareInputScan writer hangs waiting for a +-- DONE ack from the reader slice. The walk must treat the correlated-join +-- inner side as a separate slice. +CREATE TABLE ss_c2 (id numeric, refrcode varchar(255), referenceid numeric) + DISTRIBUTED REPLICATED; +CREATE TABLE ss_c1 (id bigint, iscalctrg varchar(15) NOT NULL, + iscalcdetail varchar(15)) + DISTRIBUTED REPLICATED; +INSERT INTO ss_c2 SELECT i, 'A'||(i%5), 101991 FROM generate_series(1, 50000) i; +INSERT INTO ss_c1 + SELECT i, 'A'||(i%5), 'A'||(i%7) FROM generate_series(1, 50000) i; +ANALYZE ss_c1; +ANALYZE ss_c2; +SET statement_timeout = '15s'; +WITH cte AS ( + SELECT id, refrcode FROM ss_c2 WHERE referenceid = 101991 AND id < 25000 + UNION ALL + SELECT id, refrcode FROM ss_c2 WHERE referenceid = 101991 AND id >= 25000 +) + SELECT (SELECT refrcode FROM cte WHERE refrcode = p.iscalctrg LIMIT 1) = 'A1' + AND (SELECT refrcode FROM cte WHERE refrcode = p.iscalcdetail LIMIT 1) = 'A1' AS ok + FROM ss_c1 p WHERE p.id = 1; + ok +---- + t +(1 row) + +RESET statement_timeout; +DROP TABLE ss_c1, ss_c2; -- Test the scenario which already opened many fds -- start_ignore RESET search_path; diff --git a/src/test/regress/expected/shared_scan_optimizer.out b/src/test/regress/expected/shared_scan_optimizer.out index 56919a5fcb4..fbfb446a260 100644 --- a/src/test/regress/expected/shared_scan_optimizer.out +++ b/src/test/regress/expected/shared_scan_optimizer.out @@ -291,6 +291,38 @@ WITH RESET statement_timeout; DROP TABLE ss_t1, ss_t2; +-- ORCA should also fall back when the replicated CTE is referenced from +-- *correlated* scalar subqueries. These become correlated NL joins whose +-- inner side runs as a SubPlan in its own slice. Counting only Motion +-- nodes misses this, so the ShareInputScan writer hangs waiting for a +-- DONE ack from the reader slice. The walk must treat the correlated-join +-- inner side as a separate slice. +CREATE TABLE ss_c2 (id numeric, refrcode varchar(255), referenceid numeric) + DISTRIBUTED REPLICATED; +CREATE TABLE ss_c1 (id bigint, iscalctrg varchar(15) NOT NULL, + iscalcdetail varchar(15)) + DISTRIBUTED REPLICATED; +INSERT INTO ss_c2 SELECT i, 'A'||(i%5), 101991 FROM generate_series(1, 50000) i; +INSERT INTO ss_c1 + SELECT i, 'A'||(i%5), 'A'||(i%7) FROM generate_series(1, 50000) i; +ANALYZE ss_c1; +ANALYZE ss_c2; +SET statement_timeout = '15s'; +WITH cte AS ( + SELECT id, refrcode FROM ss_c2 WHERE referenceid = 101991 AND id < 25000 + UNION ALL + SELECT id, refrcode FROM ss_c2 WHERE referenceid = 101991 AND id >= 25000 +) + SELECT (SELECT refrcode FROM cte WHERE refrcode = p.iscalctrg LIMIT 1) = 'A1' + AND (SELECT refrcode FROM cte WHERE refrcode = p.iscalcdetail LIMIT 1) = 'A1' AS ok + FROM ss_c1 p WHERE p.id = 1; + ok +---- + t +(1 row) + +RESET statement_timeout; +DROP TABLE ss_c1, ss_c2; -- Test the scenario which already opened many fds -- start_ignore RESET search_path; diff --git a/src/test/regress/sql/shared_scan.sql b/src/test/regress/sql/shared_scan.sql index df2d21faf2d..e8b1223b3dd 100644 --- a/src/test/regress/sql/shared_scan.sql +++ b/src/test/regress/sql/shared_scan.sql @@ -150,6 +150,35 @@ WITH RESET statement_timeout; DROP TABLE ss_t1, ss_t2; +-- ORCA should also fall back when the replicated CTE is referenced from +-- *correlated* scalar subqueries. These become correlated NL joins whose +-- inner side runs as a SubPlan in its own slice. Counting only Motion +-- nodes misses this, so the ShareInputScan writer hangs waiting for a +-- DONE ack from the reader slice. The walk must treat the correlated-join +-- inner side as a separate slice. +CREATE TABLE ss_c2 (id numeric, refrcode varchar(255), referenceid numeric) + DISTRIBUTED REPLICATED; +CREATE TABLE ss_c1 (id bigint, iscalctrg varchar(15) NOT NULL, + iscalcdetail varchar(15)) + DISTRIBUTED REPLICATED; +INSERT INTO ss_c2 SELECT i, 'A'||(i%5), 101991 FROM generate_series(1, 50000) i; +INSERT INTO ss_c1 + SELECT i, 'A'||(i%5), 'A'||(i%7) FROM generate_series(1, 50000) i; +ANALYZE ss_c1; +ANALYZE ss_c2; + +SET statement_timeout = '15s'; +WITH cte AS ( + SELECT id, refrcode FROM ss_c2 WHERE referenceid = 101991 AND id < 25000 + UNION ALL + SELECT id, refrcode FROM ss_c2 WHERE referenceid = 101991 AND id >= 25000 +) + SELECT (SELECT refrcode FROM cte WHERE refrcode = p.iscalctrg LIMIT 1) = 'A1' + AND (SELECT refrcode FROM cte WHERE refrcode = p.iscalcdetail LIMIT 1) = 'A1' AS ok + FROM ss_c1 p WHERE p.id = 1; +RESET statement_timeout; +DROP TABLE ss_c1, ss_c2; + -- Test the scenario which already opened many fds -- start_ignore RESET search_path; From 9e126afb879ce7e9141f46adf6d3bcb2c5fd27be Mon Sep 17 00:00:00 2001 From: Alena Rybakina Date: Sat, 13 Jun 2026 16:08:19 +0300 Subject: [PATCH 2/2] Generate a native ORCA plan for a replicated CTE in scalar subqueries A CTE over a DISTRIBUTED REPLICATED table referenced from several scalar subqueries put ORCA's SharedScan Producer and Consumer on different slices, and that cross-slice SharedScan hung. We used to detect the shape before DXL translation and fall back to the Postgres planner; now ORCA handles the repairable case natively. The fix is in apply_shareinput_xslice (src/backend/cdb/cdbmutate.c): a cross-slice Consumer inside a SubPlan over a replicated CTE gets a local copy of the Producer's subtree (Materialize + base Scan) with a fresh share_id in its own slice. The data is on every segment, so the local copy is equivalent. Consumers of the same CTE in one slice share the copy; orphaned original Producers are dropped. This only works when the CTE body is a single replicated base scan. So the pre-DXL check (CUtils) falls back unless the Consumer is inside a correlated NL join's SubPlan AND the Producer body is a single base scan. A UNION ALL (Append) body, or a broadcast/duplicate-hazard join (greengage 51fe92e) where the Consumer is not in a SubPlan, still falls back. --- src/backend/cdb/cdbmutate.c | 513 ++++++++++++++++-- .../gporca/libgpopt/src/base/CUtils.cpp | 188 ++++++- src/include/nodes/relation.h | 24 + src/test/regress/expected/shared_scan.out | 44 +- .../expected/shared_scan_optimizer.out | 46 +- src/test/regress/sql/shared_scan.sql | 20 +- 6 files changed, 771 insertions(+), 64 deletions(-) diff --git a/src/backend/cdb/cdbmutate.c b/src/backend/cdb/cdbmutate.c index 9fb644791c6..d3c3d2b7137 100644 --- a/src/backend/cdb/cdbmutate.c +++ b/src/backend/cdb/cdbmutate.c @@ -2437,6 +2437,12 @@ shareinput_mutator_xslice_1(Node *node, PlannerInfo *root, bool fPop) ctxt->producers[sisc->share_id] = sisc; ctxt->sliceMarks[sisc->share_id] = motId; } + else + { + /* Consumer: count references to original producers */ + if (sisc->share_id < ctxt->orig_producer_count) + ctxt->consumer_counts[sisc->share_id]++; + } } return true; @@ -2487,13 +2493,170 @@ shareinput_mutator_xslice_2(Node *node, PlannerInfo *root, bool fPop) if (shareSliceId != motId) { - ShareType stype = get_plan_share_type(plan_slicemark.plan); + /* + * Check for cross-slice SharedScan with a replicated + * table inside a SubPlan. When the producer runs on + * fewer segments than the consumer, the temp file does + * not exist on all consumer segments. + * + * Fix: give this consumer its own copy of the + * producer's underlying plan (Materialize + Scan) and + * convert it to an intra-slice SHARE_MATERIAL with a + * new share_id. The consumer then materializes data + * locally instead of reading cross-slice temp files. + */ + bool inlined = false; + + if (ctxt->walking_subplan) + { + int origShareId = sisc->share_id; + int existingNewId = -1; + int k; + + /* + * Check if we already inlined a consumer for this + * same original share_id in this same slice. If so, + * make this consumer a reader of the existing inlined + * producer instead of creating another copy. + */ + for (k = 0; k < ctxt->inlined_count; k++) + { + if (ctxt->inlined_orig_ids[k] == origShareId && + ctxt->inlined_mot_ids[k] == motId) + { + existingNewId = ctxt->inlined_new_ids[k]; + break; + } + } + + if (existingNewId >= 0) + { + /* + * Reuse the already-inlined producer: make this + * consumer a reader with the same share_id. + * This is intra-slice sharing, so no need to + * increment nsharer_xslice. + */ + if (origShareId < ctxt->orig_producer_count) + ctxt->consumer_counts[origShareId]--; + + sisc->share_id = existingNewId; + sisc->share_type = SHARE_MATERIAL; + sisc->driver_slice = motId; + + inlined = true; + } + else + { + ShareInputScan *producer = ctxt->producers[origShareId]; + Plan *producerChild = producer->scan.plan.lefttree; + Plan *leaf = producerChild; + + while (leaf && leaf->lefttree) + leaf = leaf->lefttree; + + /* + * Any scan over a base relation shares the Scan + * struct layout (scanrelid at a fixed offset): + * SeqScan, IndexScan, IndexOnlyScan, BitmapHeapScan, + * TidScan, SampleScan, and the Dynamic* variants. + */ + if (leaf && + (IsA(leaf, SeqScan) || + IsA(leaf, IndexScan) || + IsA(leaf, IndexOnlyScan) || + IsA(leaf, BitmapHeapScan) || + IsA(leaf, TidScan) || + IsA(leaf, DynamicSeqScan) || + IsA(leaf, DynamicIndexScan) || + IsA(leaf, DynamicBitmapHeapScan))) + { + Index scanrelid = ((Scan *) leaf)->scanrelid; + List *rtable = glob->finalrtable; - if (stype == SHARE_MATERIAL || stype == SHARE_SORT) - set_plan_share_type_xslice(plan_slicemark.plan); + if (scanrelid > 0 && + scanrelid <= (Index) list_length(rtable)) + { + RangeTblEntry *rte = rt_fetch(scanrelid, rtable); + + if (rte->rtekind == RTE_RELATION) + { + GpPolicy *policy = GpPolicyFetch(rte->relid); + + if (policy && + policy->ptype == POLICYTYPE_REPLICATED && + producerChild) + { + Plan *newChild; + int newShareId; + + pfree(policy); + + /* + * Deep copy the producer's subtree and + * assign a fresh share_id so there is no + * conflict with the original producer. + */ + if (origShareId < ctxt->orig_producer_count) + ctxt->consumer_counts[origShareId]--; + + newChild = (Plan *) copyObject(producerChild); + newShareId = ctxt->producer_count; + ctxt->producer_count++; + + set_plan_share_id(newChild, newShareId); + sisc->share_id = newShareId; + sisc->share_type = SHARE_MATERIAL; + sisc->driver_slice = motId; + plan->lefttree = newChild; + + /* + * Register the new producer so later + * passes can find it. + */ + ctxt->producers = repalloc(ctxt->producers, + ctxt->producer_count * sizeof(ShareInputScan *)); + ctxt->producers[newShareId] = sisc; + ctxt->sliceMarks = repalloc(ctxt->sliceMarks, + ctxt->producer_count * sizeof(int)); + ctxt->sliceMarks[newShareId] = motId; + + /* + * Record the mapping so subsequent consumers + * of the same CTE in this slice can reuse + * this inlined producer. + */ + ctxt->inlined_count++; + ctxt->inlined_orig_ids = repalloc(ctxt->inlined_orig_ids, + ctxt->inlined_count * sizeof(int)); + ctxt->inlined_mot_ids = repalloc(ctxt->inlined_mot_ids, + ctxt->inlined_count * sizeof(int)); + ctxt->inlined_new_ids = repalloc(ctxt->inlined_new_ids, + ctxt->inlined_count * sizeof(int)); + ctxt->inlined_orig_ids[ctxt->inlined_count - 1] = origShareId; + ctxt->inlined_mot_ids[ctxt->inlined_count - 1] = motId; + ctxt->inlined_new_ids[ctxt->inlined_count - 1] = newShareId; + + inlined = true; + } + else if (policy) + pfree(policy); + } + } + } + } + } + + if (!inlined) + { + ShareType stype = get_plan_share_type(plan_slicemark.plan); - incr_plan_nsharer_xslice(plan_slicemark.plan); - sisc->driver_slice = motId; + if (stype == SHARE_MATERIAL || stype == SHARE_SORT) + set_plan_share_type_xslice(plan_slicemark.plan); + + incr_plan_nsharer_xslice(plan_slicemark.plan); + sisc->driver_slice = motId; + } } } } @@ -2611,22 +2774,320 @@ shareinput_mutator_xslice_4(Node *node, PlannerInfo *root, bool fPop) return true; } +/* + * record_subplan_motid_walker + * Walk expressions looking for SubPlan references. For each SubPlan found, + * record the current motId from the motion stack. This tells us what slice + * the SubPlan actually executes in (which is the caller's slice). + */ +static bool +record_subplan_motid_walker(Node *node, ApplyShareInputContext *ctxt) +{ + if (node == NULL) + return false; + + if (IsA(node, SubPlan)) + { + SubPlan *sp = (SubPlan *) node; + int motId = shareinput_peekmot(ctxt); + + if (sp->plan_id >= 1 && sp->plan_id <= ctxt->num_subplans) + ctxt->subplan_motids[sp->plan_id - 1] = motId; + + /* don't recurse into SubPlan's testexpr/args */ + return false; + } + + return expression_tree_walker(node, record_subplan_motid_walker, ctxt); +} + +/* + * shareinput_mutator_build_subplan_motids + * Pre-pass over the main plan tree to build a mapping from SubPlan plan_id + * to the motId (slice) where the SubPlan is referenced. + * + * SubPlan plan trees are walked separately by apply_shareinput_xslice, but + * without motion context from the main plan. This causes the xslice passes + * to incorrectly treat SharedScan consumers in SubPlans as being in slice 0 + * instead of their actual execution slice. By recording the correct motId + * here, we can push it before walking each subplan. + */ + static bool + shareinput_mutator_build_subplan_motids(Node *node, PlannerInfo *root, bool fPop) + { + PlannerGlobal *glob = root->glob; + ApplyShareInputContext *ctxt = &glob->share; + Plan *plan = (Plan *) node; + + if (fPop) + { + if (IsA(plan, Motion)) + shareinput_popmot(ctxt); + return false; + } + + if (IsA(plan, Motion)) + { + Motion *motion = (Motion *) plan; + + shareinput_pushmot(ctxt, motion->motionID); + return true; + } + + /* Scan common expression fields for SubPlan references */ + record_subplan_motid_walker((Node *) plan->targetlist, ctxt); + record_subplan_motid_walker((Node *) plan->qual, ctxt); + + /* + * Check additional node-type-specific expression fields where SubPlan + * references may appear. Modeled after finalize_plan() in subselect.c. + */ + switch (nodeTag(plan)) + { + case T_Result: + record_subplan_motid_walker(((Result *) plan)->resconstantqual, ctxt); + break; + case T_IndexScan: + record_subplan_motid_walker((Node *) ((IndexScan *) plan)->indexqual, ctxt); + record_subplan_motid_walker((Node *) ((IndexScan *) plan)->indexorderby, ctxt); + break; + case T_IndexOnlyScan: + record_subplan_motid_walker((Node *) ((IndexOnlyScan *) plan)->indexqual, ctxt); + record_subplan_motid_walker((Node *) ((IndexOnlyScan *) plan)->indexorderby, ctxt); + break; + case T_BitmapIndexScan: + record_subplan_motid_walker((Node *) ((BitmapIndexScan *) plan)->indexqual, ctxt); + break; + case T_BitmapHeapScan: + record_subplan_motid_walker((Node *) ((BitmapHeapScan *) plan)->bitmapqualorig, ctxt); + break; + case T_TidScan: + record_subplan_motid_walker((Node *) ((TidScan *) plan)->tidquals, ctxt); + break; + case T_ForeignScan: + record_subplan_motid_walker((Node *) ((ForeignScan *) plan)->fdw_exprs, ctxt); + break; + case T_NestLoop: + record_subplan_motid_walker((Node *) ((Join *) plan)->joinqual, ctxt); + break; + case T_MergeJoin: + record_subplan_motid_walker((Node *) ((Join *) plan)->joinqual, ctxt); + record_subplan_motid_walker((Node *) ((MergeJoin *) plan)->mergeclauses, ctxt); + break; + case T_HashJoin: + record_subplan_motid_walker((Node *) ((Join *) plan)->joinqual, ctxt); + record_subplan_motid_walker((Node *) ((HashJoin *) plan)->hashclauses, ctxt); + record_subplan_motid_walker((Node *) ((HashJoin *) plan)->hashqualclauses, ctxt); + break; + case T_Limit: + record_subplan_motid_walker(((Limit *) plan)->limitOffset, ctxt); + record_subplan_motid_walker(((Limit *) plan)->limitCount, ctxt); + break; + case T_Motion: + record_subplan_motid_walker((Node *) ((Motion *) plan)->hashExprs, ctxt); + break; + case T_ModifyTable: + record_subplan_motid_walker((Node *) ((ModifyTable *) plan)->returningLists, ctxt); + break; + case T_ValuesScan: + record_subplan_motid_walker((Node *) ((ValuesScan *) plan)->values_lists, ctxt); + break; + case T_WindowAgg: + record_subplan_motid_walker(((WindowAgg *) plan)->startOffset, ctxt); + record_subplan_motid_walker(((WindowAgg *) plan)->endOffset, ctxt); + break; + case T_PartitionSelector: + record_subplan_motid_walker((Node *) ((PartitionSelector *) plan)->levelEqExpressions, ctxt); + record_subplan_motid_walker((Node *) ((PartitionSelector *) plan)->levelExpressions, ctxt); + record_subplan_motid_walker(((PartitionSelector *) plan)->residualPredicate, ctxt); + record_subplan_motid_walker(((PartitionSelector *) plan)->propagationExpression, ctxt); + record_subplan_motid_walker(((PartitionSelector *) plan)->printablePredicate, ctxt); + record_subplan_motid_walker((Node *) ((PartitionSelector *) plan)->partTabTargetlist, ctxt); + break; + default: + break; + } + + return true; + } + +/* + * shareinput_walk_subplans + * Walk all subplans using the given mutator function, pushing the correct + * motId for each subplan based on where it is referenced in the main plan. + */ +static void +shareinput_walk_subplans(SHAREINPUT_MUTATOR f, PlannerGlobal *glob) +{ + ApplyShareInputContext *ctxt = &glob->share; + ListCell *lp, *lr; + int i = 0; + + forboth(lp, glob->subplans, lr, glob->subroots) + { + Plan *subplan = (Plan *) lfirst(lp); + PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); + + if (ctxt->subplan_motids != NULL && i < ctxt->num_subplans) + shareinput_pushmot(ctxt, ctxt->subplan_motids[i]); + + ctxt->walking_subplan = true; + shareinput_walker(f, (Node *) subplan, subroot); + ctxt->walking_subplan = false; + + if (ctxt->subplan_motids != NULL && i < ctxt->num_subplans) + shareinput_popmot(ctxt); + + i++; + } +} + +/* + * cleanup_orphaned_producers + * After inlining, some original producers may have zero remaining + * consumers. Remove them from Sequence nodes to eliminate unnecessary + * SeqScans. Collapse Sequence nodes that end up with a single child. + */ +static Plan * +cleanup_orphaned_producers(Plan *plan, ApplyShareInputContext *ctxt) +{ + if (plan == NULL) + return NULL; + + if (IsA(plan, Sequence)) + { + Sequence *seq = (Sequence *) plan; + List *newplans = NIL; + ListCell *lc; + + foreach(lc, seq->subplans) + { + Plan *child = (Plan *) lfirst(lc); + + child = cleanup_orphaned_producers(child, ctxt); + + /* Skip orphaned SharedScan producers (those with lefttree) */ + if (IsA(child, ShareInputScan) && child->lefttree != NULL) + { + ShareInputScan *sisc = (ShareInputScan *) child; + + if (sisc->share_id < ctxt->orig_producer_count && + ctxt->consumer_counts[sisc->share_id] == 0) + continue; + } + + /* + * Flatten nested Sequences: if a child is itself a + * Sequence, splice its children into this level. + */ + if (IsA(child, Sequence)) + { + Sequence *inner = (Sequence *) child; + ListCell *lc2; + + foreach(lc2, inner->subplans) + newplans = lappend(newplans, lfirst(lc2)); + } + else + newplans = lappend(newplans, child); + } + + seq->subplans = newplans; + return plan; + } + + if (IsA(plan, Append)) + { + ListCell *lc; + + foreach(lc, ((Append *) plan)->appendplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, MergeAppend)) + { + ListCell *lc; + + foreach(lc, ((MergeAppend *) plan)->mergeplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, ModifyTable)) + { + ListCell *lc; + + foreach(lc, ((ModifyTable *) plan)->plans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, BitmapAnd)) + { + ListCell *lc; + + foreach(lc, ((BitmapAnd *) plan)->bitmapplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, BitmapOr)) + { + ListCell *lc; + + foreach(lc, ((BitmapOr *) plan)->bitmapplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, SubqueryScan)) + { + SubqueryScan *sub = (SubqueryScan *) plan; + + sub->subplan = cleanup_orphaned_producers(sub->subplan, ctxt); + } + else + { + plan->lefttree = cleanup_orphaned_producers(plan->lefttree, ctxt); + plan->righttree = cleanup_orphaned_producers(plan->righttree, ctxt); + } + + return plan; +} + Plan * apply_shareinput_xslice(Plan *plan, PlannerInfo *root) { PlannerGlobal *glob = root->glob; ApplyShareInputContext *ctxt = &glob->share; - ListCell *lp, *lr; ctxt->motStack = NULL; ctxt->qdShares = NULL; ctxt->qdSlices = NULL; ctxt->nextPlanId = 0; + ctxt->walking_subplan = false; + + ctxt->inlined_orig_ids = palloc0(sizeof(int)); + ctxt->inlined_mot_ids = palloc0(sizeof(int)); + ctxt->inlined_new_ids = palloc0(sizeof(int)); + ctxt->inlined_count = 0; + + ctxt->orig_producer_count = ctxt->producer_count; + ctxt->consumer_counts = palloc0(ctxt->producer_count * sizeof(int)); ctxt->sliceMarks = palloc0(ctxt->producer_count * sizeof(int)); shareinput_pushmot(ctxt, 0); + /* + * Pre-pass: build a mapping from SubPlan plan_id to the motId (slice) + * where the SubPlan is referenced in the main plan. This is needed + * because the xslice passes walk subplan trees separately, and without + * this mapping they would use motId=0 for all subplan nodes. + */ + ctxt->num_subplans = list_length(glob->subplans); + if (ctxt->num_subplans > 0) + { + ctxt->subplan_motids = palloc0(ctxt->num_subplans * sizeof(int)); + shareinput_walker(shareinput_mutator_build_subplan_motids, + (Node *) plan, root); + } + else + { + ctxt->subplan_motids = NULL; + } + /* * Walk the tree. See comment for each pass for what each pass will do. * The context is used to carry information from one pass to another, as @@ -2639,43 +3100,27 @@ apply_shareinput_xslice(Plan *plan, PlannerInfo *root) * walk through all plans and collect all producer subplans into the * context, before processing the consumers. */ - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); - - shareinput_walker(shareinput_mutator_xslice_1, (Node *) subplan, subroot); - } + shareinput_walk_subplans(shareinput_mutator_xslice_1, glob); shareinput_walker(shareinput_mutator_xslice_1, (Node *) plan, root); /* Now walk the tree again, and process all the consumers. */ - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); - - shareinput_walker(shareinput_mutator_xslice_2, (Node *) subplan, subroot); - } + shareinput_walk_subplans(shareinput_mutator_xslice_2, glob); shareinput_walker(shareinput_mutator_xslice_2, (Node *) plan, root); - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); - - shareinput_walker(shareinput_mutator_xslice_3, (Node *) subplan, subroot); - } + shareinput_walk_subplans(shareinput_mutator_xslice_3, glob); shareinput_walker(shareinput_mutator_xslice_3, (Node *) plan, root); - forboth(lp, glob->subplans, lr, glob->subroots) - { - Plan *subplan = (Plan *) lfirst(lp); - PlannerInfo *subroot = (PlannerInfo *) lfirst(lr); - - shareinput_walker(shareinput_mutator_xslice_4, (Node *) subplan, subroot); - } + shareinput_walk_subplans(shareinput_mutator_xslice_4, glob); shareinput_walker(shareinput_mutator_xslice_4, (Node *) plan, root); + /* + * Cleanup: remove orphaned SharedScan producers from Sequence nodes. + * After inlining, some original producers may have lost all consumers. + * Keeping them would cause unnecessary SeqScans at execution time. + */ + if (ctxt->inlined_count > 0) + plan = cleanup_orphaned_producers(plan, ctxt); + return plan; } diff --git a/src/backend/gporca/libgpopt/src/base/CUtils.cpp b/src/backend/gporca/libgpopt/src/base/CUtils.cpp index 66fa9c1db56..1a93b0b24e6 100644 --- a/src/backend/gporca/libgpopt/src/base/CUtils.cpp +++ b/src/backend/gporca/libgpopt/src/base/CUtils.cpp @@ -915,7 +915,21 @@ struct SCTEInfo ULONG cteId; ULONG sliceId; - SCTEInfo(ULONG cte_id, ULONG slice_id) : cteId(cte_id), sliceId(slice_id) + // For a Producer: true if its CTE body is a single replicated base + // scan, i.e. apply_shareinput_xslice can materialize it locally per + // Consumer slice. Unused for Consumers. + BOOL repairable; + + // For a Consumer: true if it sits inside the inner (SubPlan) side of a + // correlated NL join. Only such Consumers become executor SubPlans that + // apply_shareinput_xslice rewrites. Unused for Producers. + BOOL withinSubPlan; + + SCTEInfo(ULONG cte_id, ULONG slice_id, BOOL repair, BOOL within_subplan) + : cteId(cte_id), + sliceId(slice_id), + repairable(repair), + withinSubPlan(within_subplan) { } }; @@ -925,8 +939,10 @@ typedef CDynamicPtrArray > CTEInfoArray; // True if the operator is a correlated NL join. Its inner side becomes // an executor SubPlan that runs in its own slice, so a CTE Consumer // there is cross-slice w.r.t. a Producer outside -- which can deadlock -// the ShareInputScan writer. We treat the inner side as a slice -// boundary so the check below catches it. +// the ShareInputScan writer. apply_shareinput_xslice can only repair such +// a Consumer when the CTE body is a single replicated base scan; a +// branching body (e.g. a UNION ALL -> Append) cannot be materialized +// locally, so that shape still has to fall back. // // These are all of ORCA's SubPlan-producing operators. Add new ones here. static BOOL @@ -941,15 +957,92 @@ FCorrelatedNLJoin(COperator *pop) COperator::EopPhysicalCorrelatedNotInLeftAntiSemiNLJoin == eopid); } +// True if the operator is a physical scan over a base relation. These are +// the leaf operators apply_shareinput_xslice recognizes when it walks the +// producer subtree down to a single Scan in order to copy it locally. +static BOOL +FPhysicalBaseTableScan(COperator *pop) +{ + COperator::EOperatorId eopid = pop->Eopid(); + return (COperator::EopPhysicalTableScan == eopid || + COperator::EopPhysicalDynamicTableScan == eopid || + COperator::EopPhysicalIndexScan == eopid || + COperator::EopPhysicalIndexOnlyScan == eopid || + COperator::EopPhysicalDynamicIndexScan == eopid || + COperator::EopPhysicalBitmapTableScan == eopid || + COperator::EopPhysicalDynamicBitmapTableScan == eopid); +} + +// Mirror apply_shareinput_xslice's leaf walk: follow the single relational +// (non-scalar) child down the producer body. The replicated CTE can be +// materialized locally per Consumer slice iff that spine ends in exactly +// one base table scan. A body that branches -- a UNION ALL (-> Append) or a +// join, which has more than one relational child -- has no single lefttree +// for the executor pass to copy, so it is NOT locally repairable and the +// cross-slice Consumer must fall back. +static BOOL +FProducerBodySingleScan(CExpression *pexpr) +{ + COperator *pop = pexpr->Pop(); + + // A Motion inside the body moves rows between segments, so the body is + // not purely-local replicated data the executor pass can copy. Bail + // out (mirrors apache/cloudberry shareinput_subtree_is_replicated). + if (CUtils::FPhysicalMotion(pop)) + { + return false; + } + + if (FPhysicalBaseTableScan(pop)) + { + return true; + } + + CExpression *pexprRelChild = NULL; + ULONG relChildren = 0; + for (ULONG ul = 0; ul < pexpr->Arity(); ul++) + { + if (!(*pexpr)[ul]->Pop()->FScalar()) + { + relChildren++; + pexprRelChild = (*pexpr)[ul]; + } + } + + // Exactly one relational child: keep walking the spine. Zero (a + // non-scan leaf) or several (UnionAll / Append / Join): not a single + // base scan. + if (1 == relChildren) + { + return FProducerBodySingleScan(pexprRelChild); + } + + return false; +} + // Walk the physical tree, recording the slice id of every replicated -// CTE Producer and every CTE Consumer. Slices are delimited by Motion -// nodes and by the SubPlan (inner) side of correlated NL joins: each -// such non-scalar child lives in a fresh slice -- same motId-stack idea -// as in apply_shareinput_xslice. +// CTE Producer and every CTE Consumer. Slices are delimited by +// duplicate-hazard Motion nodes (broadcast of a strict-replicated / +// universal input) and by the SubPlan (inner) side of correlated NL +// joins: each such non-scalar child lives in a fresh slice -- same +// motId-stack idea as in apply_shareinput_xslice. +// +// Plain (non-duplicate-hazard) Motions are deliberately NOT treated as +// slice boundaries here: a replicated CTE Consumer reached only across +// such Motions is repaired natively by apply_shareinput_xslice (local +// materialization), so it must stay in its Producer's slice and not +// trigger a fallback. +// +// For each Producer we also record whether its body is a single +// replicated base scan (repairable), and for each Consumer whether it is +// inside a correlated NL join's inner SubPlan side (withinSubPlan). The +// caller combines the two: a cross-slice Consumer that is withinSubPlan +// over a repairable Producer is fixed locally by apply_shareinput_xslice +// and does NOT fall back; everything else cross-slice does. static void CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice, - ULONG *pNextSlice, CTEInfoArray *prodInfos, - CTEInfoArray *consInfos) + ULONG *pNextSlice, BOOL withinSubPlan, + CTEInfoArray *prodInfos, CTEInfoArray *consInfos) { COperator *pop = pexpr->Pop(); @@ -966,18 +1059,27 @@ CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice, if (FReplicatedLikeDistribution(pdpplan->Pds()->Edt())) { prodInfos->Append(GPOS_NEW(mp) SCTEInfo( - CPhysicalCTEProducer::PopConvert(pop)->UlCTEId(), curSlice)); + CPhysicalCTEProducer::PopConvert(pop)->UlCTEId(), curSlice, + FProducerBodySingleScan(pexprChild), false /*withinSubPlan*/)); } } else if (COperator::EopPhysicalCTEConsumer == pop->Eopid()) { - // Consumer is a leaf -- record (cteId, curSlice) and let the - // caller decide later, once the whole tree has been walked. + // Consumer is a leaf -- record (cteId, curSlice, withinSubPlan) + // and let the caller decide later, once the whole tree has been + // walked. consInfos->Append(GPOS_NEW(mp) SCTEInfo( - CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId(), curSlice)); + CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId(), curSlice, + false /*repairable*/, withinSubPlan)); } BOOL isMotion = CUtils::FPhysicalMotion(pop); + // Only a duplicate-hazard Motion (broadcast / gather of a + // strict-replicated or universal input) opens a slice that + // apply_shareinput_xslice cannot repair locally; plain Motions do + // not. FDuplicateHazardMotion asserts its argument is a Motion, so + // guard the call with isMotion. + BOOL isDupHazardMotion = isMotion && CUtils::FDuplicateHazardMotion(pexpr); BOOL isCorrelatedNLJoin = FCorrelatedNLJoin(pop); for (ULONG ul = 0; ul < pexpr->Arity(); ul++) @@ -992,23 +1094,54 @@ CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice, } // Allocate a fresh slice id for each non-scalar child of a - // Motion, and for the inner (subquery) side of a correlated NL - // join -- which the executor materializes as a SubPlan running - // in its own slice. Otherwise the child stays in the parent's - // slice. (For a NL join child 0 is the outer relation and child - // 1 is the inner/subquery relation.) + // duplicate-hazard Motion, and for the inner (subquery) side of + // a correlated NL join -- which the executor materializes as a + // SubPlan running in its own slice. Otherwise the child stays in + // the parent's slice. (For a NL join child 0 is the outer + // relation and child 1 is the inner/subquery relation.) ULONG childSlice = curSlice; - if (isMotion || (isCorrelatedNLJoin && 1 == ul)) + BOOL childWithinSubPlan = withinSubPlan; + if (isCorrelatedNLJoin && 1 == ul) + { + (*pNextSlice)++; + childSlice = *pNextSlice; + childWithinSubPlan = true; + } + else if (isDupHazardMotion) { (*pNextSlice)++; childSlice = *pNextSlice; } - CollectCTESlices(mp, pexprChild, childSlice, pNextSlice, prodInfos, - consInfos); + CollectCTESlices(mp, pexprChild, childSlice, pNextSlice, + childWithinSubPlan, prodInfos, consInfos); } } +//--------------------------------------------------------------------------- +// @function: +// CUtils::FHasCrossSliceReplicatedCTEConsumer +// +// @doc: +// Detect a replicated CTE Consumer that ends up on a different slice +// than its Producer and whose cross-slice shared-scan topology hangs at +// execution and cannot be repaired by apply_shareinput_xslice. +// +// CollectCTESlices opens a fresh slice at a duplicate-hazard Motion +// (broadcast / gather of a strict-replicated or universal input -- +// greengage 51fe92e) and at the inner (SubPlan) side of a correlated NL +// join. A cross-slice replicated Consumer is left to the native +// local-materialization pass -- and so does NOT fall back -- only when +// both hold: +// * it is inside a correlated NL join's SubPlan (withinSubPlan), and +// * its Producer's body is a single replicated base scan (repairable), +// which is exactly what apply_shareinput_xslice can copy locally. +// +// Everything else cross-slice falls back: a broadcast/duplicate-hazard +// join topology (the Consumer is not in a SubPlan), and a correlated +// scalar subquery whose CTE body is not a single base scan (e.g. a +// UNION ALL -> Append), which the executor pass cannot materialize. +//--------------------------------------------------------------------------- BOOL CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr) { @@ -1021,8 +1154,8 @@ CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr) CTEInfoArray *consInfos = GPOS_NEW(mp) CTEInfoArray(mp); ULONG nextSlice = 0; - CollectCTESlices(mp, pexpr, 0 /*curSlice*/, &nextSlice, prodInfos, - consInfos); + CollectCTESlices(mp, pexpr, 0 /*curSlice*/, &nextSlice, + false /*withinSubPlan*/, prodInfos, consInfos); BOOL cross = false; @@ -1035,6 +1168,15 @@ CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr) SCTEInfo *prod = (*prodInfos)[ip]; if (prod->cteId == cons->cteId && prod->sliceId != cons->sliceId) { + // apply_shareinput_xslice repairs this Consumer locally + // only when it is inside a correlated NL join's SubPlan + // and the Producer body is a single replicated base scan. + // Then it is safe and must not trigger a fallback. + if (cons->withinSubPlan && prod->repairable) + { + continue; + } + cross = true; goto lExit; } diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index b609b3406c2..3d2b976056d 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -96,6 +96,30 @@ typedef struct ApplyShareInputContext int *sliceMarks; /* one for each producer */ int producer_count; + int *subplan_motids; /* motId for each subplan, indexed by plan_id-1 */ + int num_subplans; + + bool walking_subplan; /* true when walking a SubPlan tree */ + + /* + * Track already-inlined cross-slice producers so that a second consumer + * of the same CTE in the same slice can share the inlined copy instead + * of creating yet another independent scan. + * + * Each entry maps (orig_share_id, motId) → new_share_id. + */ + int *inlined_orig_ids; /* original share_id */ + int *inlined_mot_ids; /* slice (motId) where it was inlined */ + int *inlined_new_ids; /* new share_id of the inlined producer */ + int inlined_count; + + /* + * Consumer reference counts for original producers, used to detect + * orphaned producers after inlining (those with zero remaining consumers). + */ + int *consumer_counts; /* consumer count per producer */ + int orig_producer_count; /* producer_count before inlining */ + } ApplyShareInputContext; diff --git a/src/test/regress/expected/shared_scan.out b/src/test/regress/expected/shared_scan.out index 75d6f6413a6..f9223d5ed84 100644 --- a/src/test/regress/expected/shared_scan.out +++ b/src/test/regress/expected/shared_scan.out @@ -250,8 +250,12 @@ where Optimizer: Postgres query optimizer (52 rows) --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -263,6 +267,42 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; + QUERY PLAN +-------------------------------------------------- + Limit + InitPlan 1 (returns $0) (slice7) + -> Gather Motion 1:1 (slice2; segments: 1) + -> Seq Scan on ss_t2 + Filter: (id = 1) + InitPlan 2 (returns $1) (slice8) + -> Gather Motion 1:1 (slice3; segments: 1) + -> Seq Scan on ss_t2 ss_t2_1 + Filter: (id = 2) + InitPlan 3 (returns $2) (slice9) + -> Gather Motion 1:1 (slice4; segments: 1) + -> Seq Scan on ss_t2 ss_t2_2 + Filter: (id = 1) + InitPlan 4 (returns $3) (slice6) + -> Gather Motion 1:1 (slice5; segments: 1) + -> Seq Scan on ss_t2 ss_t2_3 + Filter: (id = 2) + -> Gather Motion 3:1 (slice1; segments: 3) + -> Limit + -> Seq Scan on ss_t1 + Optimizer: Postgres query optimizer +(21 rows) + +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), diff --git a/src/test/regress/expected/shared_scan_optimizer.out b/src/test/regress/expected/shared_scan_optimizer.out index fbfb446a260..b4a50b25967 100644 --- a/src/test/regress/expected/shared_scan_optimizer.out +++ b/src/test/regress/expected/shared_scan_optimizer.out @@ -263,8 +263,12 @@ where Optimizer: Postgres query optimizer (52 rows) --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -276,6 +280,44 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; + QUERY PLAN +----------------------------------------------------------------------------------- + Gather Motion 3:1 (slice3; segments: 3) + -> Sequence + -> Redistribute Motion 1:3 (slice2) + -> Limit + -> Gather Motion 3:1 (slice1; segments: 3) + -> Limit + -> Result + -> Seq Scan on ss_t1 + SubPlan 1 (slice1; segments: 3) + -> Shared Scan (share slice:id 1:2) + -> Materialize + -> Seq Scan on ss_t2 ss_t2_1 + Filter: (id = 1) + SubPlan 2 (slice1; segments: 3) + -> Shared Scan (share slice:id 1:3) + -> Materialize + -> Seq Scan on ss_t2 + Filter: (id = 2) + SubPlan 3 (slice1; segments: 3) + -> Shared Scan (share slice:id 1:2) + SubPlan 4 (slice1; segments: 3) + -> Shared Scan (share slice:id 1:3) + Optimizer: Pivotal Optimizer (GPORCA) +(23 rows) + +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), diff --git a/src/test/regress/sql/shared_scan.sql b/src/test/regress/sql/shared_scan.sql index e8b1223b3dd..93c6f82f763 100644 --- a/src/test/regress/sql/shared_scan.sql +++ b/src/test/regress/sql/shared_scan.sql @@ -125,8 +125,12 @@ where and (stat.schema_name || '.' ||stat.table_name not in (select table_nm_onl_act from tbls_w_onl_actl_data)) or (stat.schema_name || '.' ||stat.table_name in (select table_nm_onl_act from tbls_w_onl_actl_data)); --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -138,7 +142,17 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; - +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1),