diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml new file mode 100644 index 0000000000000..bd62ff73e1eac --- /dev/null +++ b/.github/workflows/pr-check.yml @@ -0,0 +1,75 @@ +name: PR regression tests (REL_18_STABLE) + +on: + pull_request: + branches: + - REL_18_STABLE + +jobs: + build-and-test: + runs-on: ubuntu-latest + + steps: + - name: Checkout PR branch + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Install build dependencies + run: | + sudo apt-get update -qq + sudo apt-get install -y -qq \ + gcc make bison flex \ + libreadline-dev zlib1g-dev libssl-dev libicu-dev \ + liblz4-dev libzstd-dev \ + pkg-config \ + perl libipc-run-perl \ + python3 + + - name: Configure + run: | + ./configure \ + --enable-cassert --enable-debug --enable-tap-tests \ + --with-openssl --with-icu --with-lz4 --with-zstd + + - name: Build + run: make -j$(nproc) world-bin + + - name: Run check-world + run: make check-world + + - name: Collect regression artifacts on failure + if: failure() + run: | + mkdir -p /tmp/regress-artifacts + find . \( -name regression.diffs -o -name regression.out \ + -o -name postmaster.log -o -name initdb.log \ + -o -path '*/tmp_check/log/*' \) \ + -exec cp --parents {} /tmp/regress-artifacts/ \; + + - name: Upload regression artifacts + if: failure() + uses: actions/upload-artifact@v4 + with: + name: regression-artifacts + path: /tmp/regress-artifacts/ + if-no-files-found: ignore + + - name: Job summary + if: always() + run: | + if [ "${{ job.status }}" = "success" ]; then + echo "## ✅ check-world passed" >> "$GITHUB_STEP_SUMMARY" + else + echo "## ❌ check-world failed" >> "$GITHUB_STEP_SUMMARY" + echo "" >> "$GITHUB_STEP_SUMMARY" + echo "Download the **regression-artifacts** artifact for details." >> "$GITHUB_STEP_SUMMARY" + echo "" >> "$GITHUB_STEP_SUMMARY" + # Inline first diffs found, if any + for f in $(find . -name regression.diffs 2>/dev/null | head -5); do + echo "### ${f}" >> "$GITHUB_STEP_SUMMARY" + echo '```' >> "$GITHUB_STEP_SUMMARY" + head -100 "$f" >> "$GITHUB_STEP_SUMMARY" + echo '```' >> "$GITHUB_STEP_SUMMARY" + done + fi diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index f3e77bda27906..934c36b499ac3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -44,6 +44,7 @@ #include "jit/jit.h" #include "nodes/nodeFuncs.h" #include "pgstat.h" +#include "storage/bufmgr.h" #include "tcop/tcopprot.h" #include "utils/datum.h" #include "utils/dsa.h" @@ -77,6 +78,7 @@ typedef struct FixedParallelExecutorState dsa_pointer param_exec; int eflags; int jit_flags; + int dirtied_localbufs; /* Just for debugging purposes */ } FixedParallelExecutorState; /* @@ -756,6 +758,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, fpes->param_exec = InvalidDsaPointer; fpes->eflags = estate->es_top_eflags; fpes->jit_flags = estate->es_jit_flags; + fpes->dirtied_localbufs = dirtied_localbufs; shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); /* Store query string */ @@ -1442,6 +1445,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Get fixed-size state. */ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + dirtied_localbufs = fpes->dirtied_localbufs; /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index fdc65c2b42b33..09acdb18652d9 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -161,6 +161,7 @@ CreateExecutorState(void) estate->es_use_parallel_mode = false; estate->es_parallel_workers_to_launch = 0; estate->es_parallel_workers_launched = 0; + estate->es_tempbufs_flushed = false; /* Is the backend's temp buffers were flushed? */ estate->es_jit_flags = 0; estate->es_jit = NULL; diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index dc7d1830259f5..572a54df6add2 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -36,6 +36,7 @@ #include "executor/tqueue.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "storage/bufmgr.h" #include "utils/wait_event.h" @@ -161,6 +162,17 @@ ExecGather(PlanState *pstate) { ParallelContext *pcxt; + /* + * Flush temporary buffers if this parallel section contains + * any objects with temporary storage type. Don't bother to do it + * more than once per the query execution. + */ + if (gather->process_temp_tables && !estate->es_tempbufs_flushed) + { + FlushAllBuffers(); + estate->es_tempbufs_flushed = true; + } + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(outerPlanState(node), diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 15f8459706773..9ab1baf440b13 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -21,6 +21,7 @@ #include "lib/binaryheap.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "storage/bufmgr.h" /* * When we read tuples from workers, it's a good idea to read several at once @@ -205,6 +206,13 @@ ExecGatherMerge(PlanState *pstate) { ParallelContext *pcxt; + /* The same as in the ExecGather */ + if (gm->process_temp_tables && !estate->es_tempbufs_flushed) + { + FlushAllBuffers(); + estate->es_tempbufs_flushed = true; + } + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(outerPlanState(node), diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 8feeed9f303b1..7d7e5cf6f1853 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -601,23 +601,25 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, /* This should only be called for baserels and appendrel children. */ Assert(IS_SIMPLE_REL(rel)); + /* Set if the data source refers temp storage somehow */ + rel->needs_temp_safety = false; + /* Assorted checks based on rtekind. */ switch (rte->rtekind) { case RTE_RELATION: /* - * Currently, parallel workers can't access the leader's temporary - * tables. We could possibly relax this if we wrote all of its - * local buffers at the start of the query and made no changes - * thereafter (maybe we could allow hint bit changes), and if we - * taught the workers to read them. Writing a large number of - * temporary buffers could be expensive, though, and we don't have - * the rest of the necessary infrastructure right now anyway. So - * for now, bail out if we see a temporary table. + * It is not free to process objects with a temporary storage in + * parallel because we need to flush temporary buffers beforehand. + * So, hide this feature under a GUC. */ if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP) - return; + { + if (!extended_parallel_processing) + return; + rel->needs_temp_safety = true; + } /* * Table sampling can be pushed down to workers if the sample @@ -629,7 +631,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, if (proparallel != PROPARALLEL_SAFE) return; - if (!is_parallel_safe(root, (Node *) rte->tablesample->args)) + if (!is_parallel_safe(root, (Node *) rte->tablesample->args, &rel->needs_temp_safety)) return; } @@ -695,7 +697,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_FUNCTION: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->functions)) + if (!is_parallel_safe(root, (Node *) rte->functions, &rel->needs_temp_safety)) return; break; @@ -705,7 +707,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, case RTE_VALUES: /* Check for parallel-restricted functions. */ - if (!is_parallel_safe(root, (Node *) rte->values_lists)) + if (!is_parallel_safe(root, (Node *) rte->values_lists, &rel->needs_temp_safety)) return; break; @@ -746,14 +748,14 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, * outer join clauses work correctly. It would likely break equivalence * classes, too. */ - if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo)) + if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo, &rel->needs_temp_safety)) return; /* * Likewise, if the relation's outputs are not parallel-safe, give up. * (Usually, they're just Vars, but sometimes they're not.) */ - if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs)) + if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs, &rel->needs_temp_safety)) return; /* We have a winner. */ diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index a290a3e66b6d9..c68ac55ee727c 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -104,6 +104,7 @@ #include "optimizer/plancat.h" #include "optimizer/restrictinfo.h" #include "parser/parsetree.h" +#include "storage/bufmgr.h" #include "utils/lsyscache.h" #include "utils/selfuncs.h" #include "utils/spccache.h" @@ -129,6 +130,7 @@ double seq_page_cost = DEFAULT_SEQ_PAGE_COST; double random_page_cost = DEFAULT_RANDOM_PAGE_COST; +double write_page_cost = DEFAULT_WRITE_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; @@ -164,6 +166,8 @@ bool enable_partition_pruning = true; bool enable_presorted_aggregate = true; bool enable_async_append = true; +bool extended_parallel_processing = true; + typedef struct { PlannerInfo *root; @@ -462,6 +466,10 @@ cost_gather(GatherPath *path, PlannerInfo *root, run_cost = path->subpath->total_cost - path->subpath->startup_cost; + /* Account for flushing dirty temp buffers before launching workers */ + if (path->subpath->parallel_safe == NEEDS_TEMP_FLUSH) + startup_cost += tempbuf_flush_extra_cost(); + /* Parallel setup and communication cost. */ startup_cost += parallel_setup_cost; run_cost += parallel_tuple_cost * path->path.rows; @@ -532,10 +540,22 @@ cost_gather_merge(GatherMergePath *path, PlannerInfo *root, startup_cost += parallel_setup_cost; run_cost += parallel_tuple_cost * path->path.rows * 1.05; + /* + * Fold in the input (subpath) costs, following the same pattern as + * cost_gather: input startup goes into startup_cost, the incremental + * portion goes into run_cost, so total = startup + run naturally. + */ + startup_cost += input_startup_cost; + run_cost += input_total_cost - input_startup_cost; + + /* Account for flushing dirty temp buffers before launching workers */ + if (path->subpath->parallel_safe == NEEDS_TEMP_FLUSH) + startup_cost += tempbuf_flush_extra_cost(); + path->path.disabled_nodes = input_disabled_nodes + (enable_gathermerge ? 0 : 1); - path->path.startup_cost = startup_cost + input_startup_cost; - path->path.total_cost = (startup_cost + run_cost + input_total_cost); + path->path.startup_cost = startup_cost; + path->path.total_cost = (startup_cost + run_cost); } /* @@ -6628,3 +6648,25 @@ compute_gather_rows(Path *path) return clamp_row_est(path->rows * get_parallel_divisor(path)); } + +/* + * Before the launch parallel workers in a SELECT query, the leader process must + * flush all dirty pages in temp buffers to guarantee equal access to the data + * in each parallel worker. + * It seems difficult to calculate specific set of tables, indexes and toasts + * that may be touched inside the subtree. Moreover, stored procedures may also + * scan temporary tables. So, it makes sense to flush all temporary buffers. + * Here we calculate the cost of such operation to allow small queries do not + * activate expensive parallel scan over temp resources. + */ +Cost +tempbuf_flush_extra_cost() +{ + if (!extended_parallel_processing) + /* Fast exit if feature is disabled */ + return 0.0; + + /* Hopefully, we have an statistics on the number of dirtied buffers */ + Assert(dirtied_localbufs >= 0); + return write_page_cost * dirtied_localbufs; +} diff --git a/src/backend/optimizer/path/equivclass.c b/src/backend/optimizer/path/equivclass.c index 441f12f6c50cf..1573ffc5ce0b2 100644 --- a/src/backend/optimizer/path/equivclass.c +++ b/src/backend/optimizer/path/equivclass.c @@ -1015,6 +1015,7 @@ find_computable_ec_member(PlannerInfo *root, { List *emvars; ListCell *lc2; + bool needs_temp_flush = false; /* * We shouldn't be trying to sort by an equivalence class that @@ -1049,9 +1050,11 @@ find_computable_ec_member(PlannerInfo *root, /* * If requested, reject expressions that are not parallel-safe. We * check this last because it's a rather expensive test. + * TODO: Not sure if it is really necessary. */ if (require_parallel_safe && - !is_parallel_safe(root, (Node *) em->em_expr)) + (!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) || + needs_temp_flush)) continue; return em; /* found usable expression */ @@ -1093,6 +1096,7 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel, foreach(lc, target->exprs) { Expr *targetexpr = (Expr *) lfirst(lc); + bool needs_temp_flush = false; em = find_ec_member_matching_expr(ec, targetexpr, rel->relids); if (!em) @@ -1112,7 +1116,8 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel, * check this last because it's a rather expensive test. */ if (require_parallel_safe && - !is_parallel_safe(root, (Node *) em->em_expr)) + (!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) || + needs_temp_flush)) continue; return true; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 06128a9f243e6..4cb45236f7389 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -101,7 +101,7 @@ static Gather *create_gather_plan(PlannerInfo *root, GatherPath *best_path); static Plan *create_projection_plan(PlannerInfo *root, ProjectionPath *best_path, int flags); -static Plan *inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe); +static Plan *inject_projection_plan(Plan *subplan, List *tlist, ParallelSafe parallel_safe); static Sort *create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags); static IncrementalSort *create_incrementalsort_plan(PlannerInfo *root, IncrementalSortPath *best_path, int flags); @@ -297,7 +297,8 @@ static Unique *make_unique_from_sortclauses(Plan *lefttree, List *distinctList); static Unique *make_unique_from_pathkeys(Plan *lefttree, List *pathkeys, int numCols); static Gather *make_gather(List *qptlist, List *qpqual, - int nworkers, int rescan_param, bool single_copy, Plan *subplan); + int nworkers, int rescan_param, bool single_copy, + Plan *subplan, bool process_temp_tables); static SetOp *make_setop(SetOpCmd cmd, SetOpStrategy strategy, List *tlist, Plan *lefttree, Plan *righttree, List *groupList, long numGroups); @@ -1934,12 +1935,14 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path) tlist = build_path_tlist(root, &best_path->path); + Assert(best_path->subpath->parallel_safe > PARALLEL_UNSAFE); gather_plan = make_gather(tlist, NIL, best_path->num_workers, assign_special_exec_param(root), best_path->single_copy, - subplan); + subplan, + best_path->subpath->parallel_safe == NEEDS_TEMP_FLUSH); copy_generic_path_info(&gather_plan->plan, &best_path->path); @@ -2114,7 +2117,7 @@ create_projection_plan(PlannerInfo *root, ProjectionPath *best_path, int flags) * to apply (since the tlist might be unsafe even if the child plan is safe). */ static Plan * -inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe) +inject_projection_plan(Plan *subplan, List *tlist, ParallelSafe parallel_safe) { Plan *plan; @@ -2146,7 +2149,7 @@ inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe) * flag of the FDW's own Path node. */ Plan * -change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe) +change_plan_targetlist(Plan *subplan, List *tlist, ParallelSafe tlist_parallel_safe) { /* * If the top plan node can't do projections and its existing target list @@ -2162,7 +2165,7 @@ change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe) { /* Else we can just replace the plan node's tlist */ subplan->targetlist = tlist; - subplan->parallel_safe &= tlist_parallel_safe; + subplan->parallel_safe = tlist_parallel_safe; } return subplan; } @@ -4351,7 +4354,8 @@ create_nestloop_plan(PlannerInfo *root, List *otherclauses; List *nestParams; List *outer_tlist; - bool outer_parallel_safe; + ParallelSafe outer_parallel_safe; + bool needs_temp_flush = false; Relids saveOuterRels = root->curOuterRels; ListCell *lc; @@ -4467,8 +4471,13 @@ create_nestloop_plan(PlannerInfo *root, true); outer_tlist = lappend(outer_tlist, tle); /* ... and track whether tlist is (still) parallel-safe */ - if (outer_parallel_safe) - outer_parallel_safe = is_parallel_safe(root, (Node *) phv); + if (outer_parallel_safe > PARALLEL_UNSAFE) + { + if (!is_parallel_safe(root, (Node *) phv, &needs_temp_flush)) + outer_parallel_safe = PARALLEL_UNSAFE; + else if (needs_temp_flush) + outer_parallel_safe = NEEDS_TEMP_FLUSH; + } } if (outer_tlist != outer_plan->targetlist) outer_plan = change_plan_targetlist(outer_plan, outer_tlist, @@ -6992,7 +7001,8 @@ make_gather(List *qptlist, int nworkers, int rescan_param, bool single_copy, - Plan *subplan) + Plan *subplan, + bool process_temp_tables) { Gather *node = makeNode(Gather); Plan *plan = &node->plan; @@ -7006,6 +7016,7 @@ make_gather(List *qptlist, node->single_copy = single_copy; node->invisible = false; node->initParam = NULL; + node->process_temp_tables = process_temp_tables; return node; } diff --git a/src/backend/optimizer/plan/planmain.c b/src/backend/optimizer/plan/planmain.c index 5467e094ca7e0..ac0f3872268d4 100644 --- a/src/backend/optimizer/plan/planmain.c +++ b/src/backend/optimizer/plan/planmain.c @@ -123,7 +123,7 @@ query_planner(PlannerInfo *root, (root->query_level > 1 || debug_parallel_query != DEBUG_PARALLEL_OFF)) final_rel->consider_parallel = - is_parallel_safe(root, parse->jointree->quals); + is_parallel_safe(root, parse->jointree->quals, &final_rel->needs_temp_safety); /* * The only path for it is a trivial Result path. We cheat a diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 53eea5a5d3590..d7be9255755dc 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -478,6 +478,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, gather->num_workers = 1; gather->single_copy = true; gather->invisible = (debug_parallel_query == DEBUG_PARALLEL_REGRESS); + gather->process_temp_tables = (best_path->parallel_safe == NEEDS_TEMP_FLUSH); /* Transfer any initPlans to the new top node */ gather->plan.initPlan = top_plan->initPlan; @@ -500,7 +501,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, gather->plan.plan_rows = top_plan->plan_rows; gather->plan.plan_width = top_plan->plan_width; gather->plan.parallel_aware = false; - gather->plan.parallel_safe = false; + gather->plan.parallel_safe = PARALLEL_UNSAFE; /* * Delete the initplans' cost from top_plan. We needn't add it to the @@ -1443,6 +1444,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, List *final_targets; List *final_targets_contain_srfs; bool final_target_parallel_safe; + bool needs_temp_flush = false; RelOptInfo *current_rel; RelOptInfo *final_rel; FinalPathExtraData extra; @@ -1494,7 +1496,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, /* And check whether it's parallel safe */ final_target_parallel_safe = - is_parallel_safe(root, (Node *) final_target->exprs); + is_parallel_safe(root, (Node *) final_target->exprs, &needs_temp_flush); /* The setop result tlist couldn't contain any SRFs */ Assert(!parse->hasTargetSRFs); @@ -1664,7 +1666,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, */ final_target = create_pathtarget(root, root->processed_tlist); final_target_parallel_safe = - is_parallel_safe(root, (Node *) final_target->exprs); + is_parallel_safe(root, (Node *) final_target->exprs, &needs_temp_flush); /* * If ORDER BY was given, consider whether we should use a post-sort @@ -1677,7 +1679,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, final_target, &have_postponed_srfs); sort_input_target_parallel_safe = - is_parallel_safe(root, (Node *) sort_input_target->exprs); + is_parallel_safe(root, (Node *) sort_input_target->exprs, &needs_temp_flush); } else { @@ -1696,7 +1698,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, final_target, activeWindows); grouping_target_parallel_safe = - is_parallel_safe(root, (Node *) grouping_target->exprs); + is_parallel_safe(root, (Node *) grouping_target->exprs, &needs_temp_flush); } else { @@ -1715,7 +1717,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, { scanjoin_target = make_group_input_target(root, final_target); scanjoin_target_parallel_safe = - is_parallel_safe(root, (Node *) scanjoin_target->exprs); + is_parallel_safe(root, (Node *) scanjoin_target->exprs, &needs_temp_flush); } else { @@ -1767,6 +1769,18 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, scanjoin_targets_contain_srfs = NIL; } + /* + * Each path may have individual target containing or not references to + * relations with temporary storages. There were attempts to do it + * smartly that end up with a new Target::needs_temp_flush field that + * seems too invasive for this first attempt. + * So, just set current_rel flag as needed for temp buffers flushing and + * let Gather to do the job earlier than it could be. + * XXX: we need to be sure that no one new path created with all these + * target lists till now. + */ + current_rel->needs_temp_safety |= needs_temp_flush; + /* Apply scan/join target. */ scanjoin_target_same_exprs = list_length(scanjoin_targets) == 1 && equal(scanjoin_target->exprs, current_rel->reltarget->exprs); @@ -1875,9 +1889,13 @@ grouping_planner(PlannerInfo *root, double tuple_fraction, * query. */ if (current_rel->consider_parallel && - is_parallel_safe(root, parse->limitOffset) && - is_parallel_safe(root, parse->limitCount)) + is_parallel_safe(root, parse->limitOffset, &needs_temp_flush) && + is_parallel_safe(root, parse->limitCount, &needs_temp_flush)) + { final_rel->consider_parallel = true; + final_rel->needs_temp_safety |= + current_rel->needs_temp_safety | needs_temp_flush; + } /* * If the current_rel belongs to a single FDW, so does the final_rel. @@ -3921,8 +3939,11 @@ make_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, * target list and HAVING quals are parallel-safe. */ if (input_rel->consider_parallel && target_parallel_safe && - is_parallel_safe(root, (Node *) havingQual)) + is_parallel_safe(root, (Node *) havingQual, &grouped_rel->needs_temp_safety)) + { grouped_rel->consider_parallel = true; + grouped_rel->needs_temp_safety |= input_rel->needs_temp_safety; + } /* * If the input rel belongs to a single FDW, so does the grouped rel. @@ -4550,8 +4571,11 @@ create_window_paths(PlannerInfo *root, * target list and active windows for non-parallel-safe constructs. */ if (input_rel->consider_parallel && output_target_parallel_safe && - is_parallel_safe(root, (Node *) activeWindows)) + is_parallel_safe(root, (Node *) activeWindows, &window_rel->needs_temp_safety)) + { window_rel->consider_parallel = true; + window_rel->needs_temp_safety |= input_rel->needs_temp_safety; + } /* * If the input rel belongs to a single FDW, so does the window rel. @@ -7048,10 +7072,12 @@ plan_create_index_workers(Oid tableOid, Oid indexOid) * Currently, parallel workers can't access the leader's temporary tables. * Furthermore, any index predicate or index expressions must be parallel * safe. + * TODO: Is this hard to enable? */ if (heap->rd_rel->relpersistence == RELPERSISTENCE_TEMP || - !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index)) || - !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index))) + !is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index), &rel->needs_temp_safety) || + !is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index), &rel->needs_temp_safety) || + rel->needs_temp_safety) { parallel_workers = 0; goto done; diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index d706546f33264..6f163d522db0e 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1561,7 +1561,7 @@ clean_up_removed_plan_level(Plan *parent, Plan *child) child->startup_cost += initplan_cost; child->total_cost += initplan_cost; if (unsafe_initplans) - child->parallel_safe = false; + child->parallel_safe = PARALLEL_UNSAFE; /* * Attach plans this way so that parent's initplans are processed diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 0e567890c72ac..beafe640f5890 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -1004,7 +1004,7 @@ SS_process_ctes(PlannerInfo *root) * CTE scans are not considered for parallelism (cf * set_rel_consider_parallel). */ - splan->parallel_safe = false; + splan->parallel_safe = PARALLEL_UNSAFE; splan->setParam = NIL; splan->parParam = NIL; splan->args = NIL; @@ -2272,7 +2272,7 @@ SS_charge_for_initplans(PlannerInfo *root, RelOptInfo *final_rel) path->startup_cost += initplan_cost; path->total_cost += initplan_cost; if (unsafe_initplans) - path->parallel_safe = false; + path->parallel_safe = PARALLEL_UNSAFE; } /* diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 8e44e92057d9a..a88fc9369582b 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -92,6 +92,7 @@ typedef struct char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ + bool hasTempObject; } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -736,6 +737,7 @@ max_parallel_hazard(Query *parse) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; context.safe_param_ids = NIL; + context.hasTempObject = false; (void) max_parallel_hazard_walker((Node *) parse, &context); return context.max_hazard; } @@ -746,13 +748,17 @@ max_parallel_hazard(Query *parse) * * root->glob->maxParallelHazard must previously have been set to the * result of max_parallel_hazard() on the whole query. + * + * Expression may contain a reference to subplan that employs temporary + * relations. That's why the flag needs_temp_flush is introduced. */ bool -is_parallel_safe(PlannerInfo *root, Node *node) +is_parallel_safe(PlannerInfo *root, Node *node, bool *needs_temp_flush) { max_parallel_hazard_context context; PlannerInfo *proot; ListCell *l; + bool is_safe; /* * Even if the original querytree contained nothing unsafe, we need to @@ -767,6 +773,7 @@ is_parallel_safe(PlannerInfo *root, Node *node) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; context.safe_param_ids = NIL; + context.hasTempObject = false; /* * The params that refer to the same or parent query level are considered @@ -784,7 +791,20 @@ is_parallel_safe(PlannerInfo *root, Node *node) } } - return !max_parallel_hazard_walker(node, &context); + is_safe = !max_parallel_hazard_walker(node, &context); + + /* + * If the expression is parallel-safe, detect if it needs temp buffers + * flushing before the execution start. Don't care changing it if + * the expression is unsafe - it can't be executed by parallel workers + * anyway. + * In some cases user is interested in only negative result to test an idea. + * So, if incoming poointer is NULL, skip this step. + */ + if (needs_temp_flush && is_safe && context.hasTempObject) + *needs_temp_flush = NEEDS_TEMP_FLUSH; + + return is_safe; } /* core logic for all parallel-hazard checks */ @@ -906,6 +926,8 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) return true; save_safe_param_ids = context->safe_param_ids; + context->hasTempObject = + context->hasTempObject || (subplan->parallel_safe == NEEDS_TEMP_FLUSH); context->safe_param_ids = list_concat_copy(context->safe_param_ids, subplan->paramIds); if (max_parallel_hazard_walker(subplan->testexpr, context)) diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index e8d8a537061fe..0432f5027c890 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -972,6 +972,44 @@ add_partial_path_precheck(RelOptInfo *parent_rel, int disabled_nodes, return true; } +static inline ParallelSafe +parallel_safety(RelOptInfo *rel) +{ + if (!rel->consider_parallel) + return PARALLEL_UNSAFE; + + if (rel->needs_temp_safety) + return NEEDS_TEMP_FLUSH; + + return PARALLEL_SAFE; +} + +static inline ParallelSafe +compute_parallel_safety(PlannerInfo *root, RelOptInfo *rel, + PathTarget *target, Path *subpath) +{ + ParallelSafe level = PARALLEL_SAFE; + bool needs_temp_flush = false; + + if (!rel->consider_parallel) + return PARALLEL_UNSAFE; + + if (rel->needs_temp_safety) + level = NEEDS_TEMP_FLUSH; + + if (subpath) + level = Min(level, subpath->parallel_safe); + + if (target) + { + if (!is_parallel_safe(root, (Node *) target->exprs, &needs_temp_flush)) + return PARALLEL_UNSAFE; + + if (needs_temp_flush) + level = Min(level, NEEDS_TEMP_FLUSH); + } + return level; +} /***************************************************************************** * PATH NODE CREATION ROUTINES @@ -994,7 +1032,7 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = (parallel_workers > 0); - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = parallel_workers; pathnode->pathkeys = NIL; /* seqscan has unordered result */ @@ -1018,7 +1056,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* samplescan has unordered result */ @@ -1070,7 +1108,7 @@ create_index_path(PlannerInfo *root, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; @@ -1113,7 +1151,7 @@ create_bitmap_heap_path(PlannerInfo *root, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = (parallel_degree > 0); - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = parallel_degree; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1165,7 +1203,7 @@ create_bitmap_and_path(PlannerInfo *root, * without actually iterating over the list of children. */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1217,7 +1255,7 @@ create_bitmap_or_path(PlannerInfo *root, * without actually iterating over the list of children. */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1246,7 +1284,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1275,7 +1313,7 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* always unordered */ @@ -1336,7 +1374,7 @@ create_append_path(PlannerInfo *root, required_outer); pathnode->path.parallel_aware = parallel_aware; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -1377,8 +1415,8 @@ create_append_path(PlannerInfo *root, { Path *subpath = (Path *) lfirst(l); - pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(pathnode->path.parallel_safe, + subpath->parallel_safe); /* All child paths must have same parameterization */ Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); @@ -1494,7 +1532,7 @@ create_merge_append_path(PlannerInfo *root, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; pathnode->subpaths = subpaths; @@ -1523,8 +1561,8 @@ create_merge_append_path(PlannerInfo *root, Assert(bms_is_empty(PATH_REQ_OUTER(subpath))); pathnode->path.rows += subpath->rows; - pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(pathnode->path.parallel_safe, + subpath->parallel_safe); if (pathkeys_contained_in(pathkeys, subpath->pathkeys)) { @@ -1596,7 +1634,7 @@ create_group_result_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target; pathnode->path.param_info = NULL; /* there are no other rels... */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; pathnode->quals = havingqual; @@ -1645,8 +1683,8 @@ create_material_path(RelOptInfo *rel, Path *subpath) pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), + subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = subpath->pathkeys; @@ -1680,8 +1718,7 @@ create_memoize_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = subpath->pathkeys; @@ -1909,8 +1946,7 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* @@ -2190,7 +2226,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /* Gather has unordered result */ @@ -2233,8 +2269,8 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = Min(parallel_safety(rel), + subpath->parallel_safe); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; pathnode->subpath = subpath; @@ -2262,7 +2298,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = pathkeys; @@ -2288,7 +2324,7 @@ create_tablefuncscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2314,7 +2350,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2340,7 +2376,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = pathkeys; @@ -2366,7 +2402,7 @@ create_namedtuplestorescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2392,7 +2428,7 @@ create_resultscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2418,7 +2454,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = parallel_safety(rel); pathnode->parallel_workers = 0; pathnode->pathkeys = NIL; /* result is always unordered */ @@ -2461,7 +2497,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2515,7 +2551,7 @@ create_foreign_join_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target ? target : rel->reltarget; pathnode->path.param_info = NULL; /* XXX see above */ pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2564,7 +2600,7 @@ create_foreign_upper_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = target ? target : rel->reltarget; pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_safe = parallel_safety(rel); pathnode->path.parallel_workers = 0; pathnode->path.rows = rows; pathnode->path.disabled_nodes = disabled_nodes; @@ -2728,8 +2764,9 @@ create_nestloop_path(PlannerInfo *root, required_outer, &restrict_clauses); pathnode->jpath.path.parallel_aware = false; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; pathnode->jpath.path.pathkeys = pathkeys; @@ -2794,8 +2831,9 @@ create_mergejoin_path(PlannerInfo *root, required_outer, &restrict_clauses); pathnode->jpath.path.parallel_aware = false; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; pathnode->jpath.path.pathkeys = pathkeys; @@ -2860,8 +2898,9 @@ create_hashjoin_path(PlannerInfo *root, &restrict_clauses); pathnode->jpath.path.parallel_aware = joinrel->consider_parallel && parallel_hash; - pathnode->jpath.path.parallel_safe = joinrel->consider_parallel && - outer_path->parallel_safe && inner_path->parallel_safe; + pathnode->jpath.path.parallel_safe = Min(Min(parallel_safety(joinrel), + outer_path->parallel_safe), + inner_path->parallel_safe); /* This is a foolish way to estimate parallel_workers, but for now... */ pathnode->jpath.path.parallel_workers = outer_path->parallel_workers; @@ -2929,9 +2968,7 @@ create_projection_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe && - is_parallel_safe(root, (Node *) target->exprs); + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, target, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Projection does not change the sort order */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3039,9 +3076,12 @@ apply_projection_to_path(PlannerInfo *root, * arrange for the subpath to return the required target list so that * workers can help project. But if there is something that is not * parallel-safe in the target expressions, then we can't. + * + * XXX: don't need flag here because create_projection_path will check the + * target safety anyway. */ if ((IsA(path, GatherPath) || IsA(path, GatherMergePath)) && - is_parallel_safe(root, (Node *) target->exprs)) + is_parallel_safe(root, (Node *) target->exprs, NULL)) { /* * We always use create_projection_path here, even if the subpath is @@ -3075,14 +3115,14 @@ apply_projection_to_path(PlannerInfo *root, } } else if (path->parallel_safe && - !is_parallel_safe(root, (Node *) target->exprs)) + !is_parallel_safe(root, (Node *) target->exprs, NULL)) { /* * We're inserting a parallel-restricted target list into a path * currently marked parallel-safe, so we have to mark it as no longer * safe. */ - path->parallel_safe = false; + path->parallel_safe = PARALLEL_UNSAFE; } return path; @@ -3113,9 +3153,7 @@ create_set_projection_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe && - is_parallel_safe(root, (Node *) target->exprs); + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, target, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Projection does not change the sort order XXX? */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3185,8 +3223,7 @@ create_incremental_sort_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -3233,8 +3270,7 @@ create_sort_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.pathkeys = pathkeys; @@ -3279,8 +3315,7 @@ create_group_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Group doesn't change sort ordering */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3338,8 +3373,7 @@ create_upper_unique_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* Unique doesn't change the input ordering */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3395,8 +3429,7 @@ create_agg_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; if (aggstrategy == AGG_SORTED) @@ -3479,8 +3512,7 @@ create_groupingsets_path(PlannerInfo *root, pathnode->path.pathtarget = target; pathnode->path.param_info = subpath->param_info; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->subpath = subpath; @@ -3640,7 +3672,7 @@ create_minmaxagg_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = true; /* might change below */ + pathnode->path.parallel_safe = PARALLEL_SAFE; /* might change below */ pathnode->path.parallel_workers = 0; /* Result is one unordered row */ pathnode->path.rows = 1; @@ -3658,7 +3690,7 @@ create_minmaxagg_path(PlannerInfo *root, initplan_disabled_nodes += mminfo->path->disabled_nodes; initplan_cost += mminfo->pathcost; if (!mminfo->path->parallel_safe) - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; } /* add tlist eval cost for each output row, plus cpu_tuple_cost */ @@ -3686,10 +3718,16 @@ create_minmaxagg_path(PlannerInfo *root, * we are in a subquery then it can be useful for the outer query to know * that this one is parallel-safe.) */ - if (pathnode->path.parallel_safe) - pathnode->path.parallel_safe = - is_parallel_safe(root, (Node *) target->exprs) && - is_parallel_safe(root, (Node *) quals); + if (pathnode->path.parallel_safe > PARALLEL_UNSAFE) + { + bool needs_temp_flush = false; + + if (!is_parallel_safe(root, (Node *) target->exprs, &needs_temp_flush) || + !is_parallel_safe(root, (Node *) quals, &needs_temp_flush)) + pathnode->path.parallel_safe = PARALLEL_UNSAFE; + else if (needs_temp_flush) + pathnode->path.parallel_safe = NEEDS_TEMP_FLUSH; + } return pathnode; } @@ -3734,8 +3772,7 @@ create_windowagg_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; /* WindowAgg preserves the input sort order */ pathnode->path.pathkeys = subpath->pathkeys; @@ -3804,8 +3841,7 @@ create_setop_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - leftpath->parallel_safe && rightpath->parallel_safe; + pathnode->path.parallel_safe = Min(compute_parallel_safety(root, rel, NULL, leftpath), rightpath->parallel_safe); pathnode->path.parallel_workers = leftpath->parallel_workers + rightpath->parallel_workers; /* SetOp preserves the input sort order if in sort mode */ @@ -3921,8 +3957,7 @@ create_recursiveunion_path(PlannerInfo *root, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - leftpath->parallel_safe && rightpath->parallel_safe; + pathnode->path.parallel_safe = Min(compute_parallel_safety(root, rel, NULL, leftpath), rightpath->parallel_safe); /* Foolish, but we'll do it like joins for now: */ pathnode->path.parallel_workers = leftpath->parallel_workers; /* RecursiveUnion result is always unsorted */ @@ -3961,7 +3996,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.rows = subpath->rows; @@ -4043,7 +4078,7 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; + pathnode->path.parallel_safe = PARALLEL_UNSAFE; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; @@ -4130,8 +4165,7 @@ create_limit_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = rel->consider_parallel && - subpath->parallel_safe; + pathnode->path.parallel_safe = compute_parallel_safety(root, rel, NULL, subpath); pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->path.rows = subpath->rows; pathnode->path.disabled_nodes = subpath->disabled_nodes; diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c index ff507331a061a..7482fe9a12452 100644 --- a/src/backend/optimizer/util/relnode.c +++ b/src/backend/optimizer/util/relnode.c @@ -211,6 +211,7 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptInfo *parent) rel->consider_startup = (root->tuple_fraction > 0); rel->consider_param_startup = false; /* might get changed later */ rel->consider_parallel = false; /* might get changed later */ + rel->needs_temp_safety = false; /* might get changed later */ rel->reltarget = create_empty_pathtarget(); rel->pathlist = NIL; rel->ppilist = NIL; @@ -707,6 +708,7 @@ build_join_rel(PlannerInfo *root, joinrel->consider_startup = (root->tuple_fraction > 0); joinrel->consider_param_startup = false; joinrel->consider_parallel = false; + joinrel->needs_temp_safety = false; joinrel->reltarget = create_empty_pathtarget(); joinrel->pathlist = NIL; joinrel->ppilist = NIL; @@ -840,9 +842,13 @@ build_join_rel(PlannerInfo *root, * here. */ if (inner_rel->consider_parallel && outer_rel->consider_parallel && - is_parallel_safe(root, (Node *) restrictlist) && - is_parallel_safe(root, (Node *) joinrel->reltarget->exprs)) + is_parallel_safe(root, (Node *) restrictlist, &joinrel->needs_temp_safety) && + is_parallel_safe(root, (Node *) joinrel->reltarget->exprs, &joinrel->needs_temp_safety)) + { joinrel->consider_parallel = true; + joinrel->needs_temp_safety |= + (inner_rel->needs_temp_safety | outer_rel->needs_temp_safety); + } /* Add the joinrel to the PlannerInfo. */ add_join_rel(root, joinrel); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 6ba5e2075341f..da1fa788a74cc 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -5013,6 +5013,12 @@ FlushRelationBuffers(Relation rel) } } +void +FlushAllBuffers(void) +{ + FlushAllLocalBuffers(); +} + /* --------------------------------------------------------------------- * FlushRelationsAllBuffers * diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index ba26627f7b00d..e32818b8938d9 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -43,6 +43,10 @@ typedef struct int NLocBuffer = 0; /* until buffers are initialized */ + +int allocated_localbufs = 0; +int dirtied_localbufs = 0; + BufferDesc *LocalBufferDescriptors = NULL; Block *LocalBufferBlockPointers = NULL; int32 *LocalRefCount = NULL; @@ -184,6 +188,12 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) instr_time io_start; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); + /* + * Parallel temp table scan allows an access to temp tables. So, to be + * paranoid enough we should check it each time, flushing local buffer. + */ + Assert(!IsParallelWorker()); + Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0); /* @@ -507,7 +517,10 @@ MarkLocalBufferDirty(Buffer buffer) buf_state = pg_atomic_read_u32(&bufHdr->state); if (!(buf_state & BM_DIRTY)) + { pgBufferUsage.local_blks_dirtied++; + dirtied_localbufs++; + } buf_state |= BM_DIRTY; @@ -568,6 +581,12 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit /* Clear earlier errors, if this IO failed, it'll be marked again */ buf_state &= ~BM_IO_ERROR; + if (buf_state & BM_DIRTY) + { + Assert(dirtied_localbufs > 0); + dirtied_localbufs--; + } + if (clear_dirty) buf_state &= ~BM_DIRTY; @@ -607,6 +626,12 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced) uint32 buf_state; LocalBufferLookupEnt *hresult; + if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY) + { + Assert(dirtied_localbufs > 0); + dirtied_localbufs--; + } + /* * It's possible that we started IO on this buffer before e.g. aborting * the transaction that created a table. We need to wait for that IO to @@ -722,19 +747,6 @@ InitLocalBuffers(void) HASHCTL info; int i; - /* - * Parallel workers can't access data in temporary tables, because they - * have no visibility into the local buffers of their leader. This is a - * convenient, low-cost place to provide a backstop check for that. Note - * that we don't wish to prevent a parallel worker from accessing catalog - * metadata about a temp table, so checks at higher levels would be - * inappropriate. - */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot access temporary tables during a parallel operation"))); - /* Allocate and zero buffer headers and auxiliary arrays */ LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc)); LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block)); @@ -937,6 +949,7 @@ GetLocalBufferStorage(void) this_buf = cur_block + next_buf_in_block * BLCKSZ; next_buf_in_block++; total_bufs_allocated++; + allocated_localbufs++; /* * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it @@ -1010,3 +1023,36 @@ AtProcExit_LocalBuffers(void) */ CheckForLocalBufferLeaks(); } + +/* + * Flush each temporary buffer page to the disk. + * + * It is costly operation needed solely to let temporary tables, indexes and + * 'toasts' participate in a parallel query plan. + */ +void +FlushAllLocalBuffers(void) +{ + int i; + + for (i = 0; i < NLocBuffer; i++) + { + BufferDesc *bufHdr = GetLocalBufferDescriptor(i); + uint32 buf_state; + + if (LocalBufHdrGetBlock(bufHdr) == NULL) + continue; + + buf_state = pg_atomic_read_u32(&bufHdr->state); + + /* XXX only valid dirty pages need to be flushed? */ + if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) + { + PinLocalBuffer(bufHdr, false); + FlushLocalBuffer(bufHdr, NULL); + UnpinLocalBuffer(BufferDescriptorGetBuffer(bufHdr)); + } + } + + Assert(dirtied_localbufs == 0); +} diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 559ba9cdb2cde..cf2e651289d50 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -2132,6 +2132,10 @@ RelationIdGetRelation(Oid relationId) Assert(rd->rd_isvalid || (rd->rd_isnailed && !criticalRelcachesBuilt)); } + + /* Consistency check to be paranoid introducing parallel temp scan. */ + Assert(!(rd != NULL && RelationUsesLocalBuffers(rd) && IsParallelWorker() && dirtied_localbufs != 0)); + return rd; } @@ -2142,6 +2146,10 @@ RelationIdGetRelation(Oid relationId) rd = RelationBuildDesc(relationId, true); if (RelationIsValid(rd)) RelationIncrementReferenceCount(rd); + + /* Consistency check to be paranoid introducing parallel temp scan. */ + Assert(!(rd != NULL && RelationUsesLocalBuffers(rd) && IsParallelWorker() && dirtied_localbufs != 0)); + return rd; } diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 6b82a23435efe..f99a1bf5d9239 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -808,6 +808,18 @@ struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + + { + {"extended_parallel_processing", PGC_BACKEND, QUERY_TUNING_METHOD, + gettext_noop("Enables extra features of parallel pocessing."), + NULL, + GUC_EXPLAIN + }, + &extended_parallel_processing, + true, + NULL, NULL, NULL + }, + { {"enable_indexscan", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of index-scan plans."), @@ -3900,6 +3912,16 @@ struct config_real ConfigureNamesReal[] = DEFAULT_RANDOM_PAGE_COST, 0, DBL_MAX, NULL, NULL, NULL }, + { + {"write_page_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of a disk page flushing."), + NULL, + GUC_EXPLAIN + }, + &write_page_cost, + DEFAULT_WRITE_PAGE_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, { {"cpu_tuple_cost", PGC_USERSET, QUERY_TUNING_COST, gettext_noop("Sets the planner's estimate of the cost of " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d91133dbd7357..6637dc081ef76 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -432,11 +432,13 @@ #enable_group_by_reordering = on #enable_distinct_reordering = on #enable_self_join_elimination = on +#extended_parallel_processing = on # - Planner Cost Constants - #seq_page_cost = 1.0 # measured on an arbitrary scale #random_page_cost = 4.0 # same scale as above +#write_page_cost = 5.0 # same scale as above #cpu_tuple_cost = 0.01 # same scale as above #cpu_index_tuple_cost = 0.005 # same scale as above #cpu_operator_cost = 0.0025 # same scale as above diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 409e172bfb669..9fde40f65537a 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -738,6 +738,8 @@ typedef struct EState bool es_use_parallel_mode; /* can we use parallel workers? */ + bool es_tempbufs_flushed; /* Do we still need to flush dirty temp pages? */ + int es_parallel_workers_to_launch; /* number of workers to * launch. */ int es_parallel_workers_launched; /* number of workers actually diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 6eef1450e04eb..387cf900ce5c9 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -909,6 +909,10 @@ typedef struct RelOptInfo bool consider_param_startup; /* consider parallel paths? */ bool consider_parallel; + /* If the rel is allowed to be processed in parallel, does it need to flush + * temporary buffers? + */ + bool needs_temp_safety; /* * default result targetlist for Paths scanning this relation; list of @@ -1786,7 +1790,7 @@ typedef struct Path /* engage parallel-aware logic? */ bool parallel_aware; /* OK to use as part of parallel plan? */ - bool parallel_safe; + ParallelSafe parallel_safe; /* desired # of workers; 0 = not parallel */ int parallel_workers; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 4f59e30d62d5e..6a5af8c421841 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -185,7 +185,7 @@ typedef struct Plan /* engage parallel-aware logic? */ bool parallel_aware; /* OK to use as part of parallel plan? */ - bool parallel_safe; + ParallelSafe parallel_safe; /* * information needed for asynchronous execution @@ -1286,6 +1286,8 @@ typedef struct Gather bool single_copy; /* suppress EXPLAIN display (for testing)? */ bool invisible; + /* Signal if any object with temporary storage is scanned in this subtree */ + bool process_temp_tables; /* * param id's of initplans which are referred at gather or one of its @@ -1325,6 +1327,9 @@ typedef struct GatherMerge /* NULLS FIRST/LAST directions */ bool *nullsFirst pg_node_attr(array_size(numCols)); + /* Signal if any objects with temporary storage are scanned in this subtree */ + bool process_temp_tables; + /* * param id's of initplans which are referred at gather merge or one of * its child nodes diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 6dfca3cb35ba5..c39c947b381cd 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -1037,6 +1037,18 @@ typedef struct SubLink ParseLoc location; /* token location, or -1 if unknown */ } SubLink; +/* + * Start from zero and put NEEDS_TEMP_FLUSH as a first positive value. + * In this case if someone still uses true/false values for this type it just + * causes more temp buffers flushes without an error. + */ +typedef enum ParallelSafe +{ + PARALLEL_UNSAFE = 0, + NEEDS_TEMP_FLUSH, + PARALLEL_SAFE, +} ParallelSafe; + /* * SubPlan - executable expression node for a subplan (sub-SELECT) * @@ -1100,7 +1112,7 @@ typedef struct SubPlan bool unknownEqFalse; /* true if it's okay to return FALSE when the * spec result is UNKNOWN; this allows much * simpler handling of null values */ - bool parallel_safe; /* is the subplan parallel-safe? */ + ParallelSafe parallel_safe; /* is the subplan parallel-safe? */ /* Note: parallel_safe does not consider contents of testexpr or args */ /* Information for passing params into and out of the subselect: */ /* setParam and parParam are lists of integers (param IDs) */ diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 0dffec00ede93..c3d59101d41f1 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -33,7 +33,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause); extern bool contain_subplans(Node *clause); extern char max_parallel_hazard(Query *parse); -extern bool is_parallel_safe(PlannerInfo *root, Node *node); +extern bool is_parallel_safe(PlannerInfo *root, Node *node, bool *needs_temp_flush); extern bool contain_nonstrict_functions(Node *clause); extern bool contain_exec_param(Node *clause, List *param_ids); extern bool contain_leaked_vars(Node *clause); diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index d397fe27dc1e1..620ec3cc23d2c 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -23,6 +23,7 @@ /* If you change these, update backend/utils/misc/postgresql.conf.sample */ #define DEFAULT_SEQ_PAGE_COST 1.0 #define DEFAULT_RANDOM_PAGE_COST 4.0 +#define DEFAULT_WRITE_PAGE_COST 5.0 /* Make it a little more than random read. */ #define DEFAULT_CPU_TUPLE_COST 0.01 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 @@ -222,5 +223,6 @@ extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel, Path *bitmapqual, double loop_count, Cost *cost_p, double *tuples_p); extern double compute_gather_rows(Path *path); +extern Cost tempbuf_flush_extra_cost(void); #endif /* COST_H */ diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h index 546828b54bd27..d19d0d500f2a4 100644 --- a/src/include/optimizer/optimizer.h +++ b/src/include/optimizer/optimizer.h @@ -81,6 +81,7 @@ extern Selectivity clauselist_selectivity_ext(PlannerInfo *root, /* widely used cost parameters */ extern PGDLLIMPORT double seq_page_cost; extern PGDLLIMPORT double random_page_cost; +extern PGDLLIMPORT double write_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; @@ -89,6 +90,12 @@ extern PGDLLIMPORT double parallel_setup_cost; extern PGDLLIMPORT double recursive_worktable_factor; extern PGDLLIMPORT int effective_cache_size; +/* + * Enable extended feature of parallel query processing such as parallel + * temporary tables scan. + */ +extern PGDLLIMPORT bool extended_parallel_processing; + extern double clamp_row_est(double nrows); extern int32 clamp_width_est(int64 tuple_width); extern long clamp_cardinality_to_long(Cardinality x); diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 9d3debcab2801..38235e160d53f 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -45,7 +45,7 @@ extern ForeignScan *make_foreignscan(List *qptlist, List *qpqual, List *fdw_scan_tlist, List *fdw_recheck_quals, Plan *outer_plan); extern Plan *change_plan_targetlist(Plan *subplan, List *tlist, - bool tlist_parallel_safe); + ParallelSafe tlist_parallel_safe); extern Plan *materialize_finished_plan(Plan *subplan); extern bool is_projection_capable_path(Path *path); extern bool is_projection_capable_plan(Plan *plan); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 0dec7d93b3b27..bb01a93ff4d72 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -484,6 +484,7 @@ extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits, bool release_aio); extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait); extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln); +extern void FlushAllLocalBuffers(void); extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced); extern void DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber forkNum, diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 41fdc1e76938e..a96e2cef696b8 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -181,6 +181,11 @@ extern PGDLLIMPORT char *BufferBlocks; /* in localbuf.c */ extern PGDLLIMPORT int NLocBuffer; + +/* Local buffer statistics */ +extern PGDLLIMPORT int allocated_localbufs; +extern PGDLLIMPORT int dirtied_localbufs; + extern PGDLLIMPORT Block *LocalBufferBlockPointers; extern PGDLLIMPORT int32 *LocalRefCount; @@ -268,6 +273,7 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum); extern void FlushOneBuffer(Buffer buffer); extern void FlushRelationBuffers(Relation rel); +extern void FlushAllBuffers(void); extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels); extern void CreateAndCopyRelationData(RelFileLocator src_rlocator, RelFileLocator dst_rlocator, diff --git a/src/test/regress/expected/temp.out b/src/test/regress/expected/temp.out index 370361543b30c..5f1124561a7af 100644 --- a/src/test/regress/expected/temp.out +++ b/src/test/regress/expected/temp.out @@ -566,3 +566,58 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp; -- cleanup DROP FUNCTION test_temp_pin(int, int); +-- Test visibility of a temporary table tuples in parallel workers +-- Although explain prints the number of workers planned and launched, In this +-- case it shouldn't cause test results float because the debugging +-- option force usage of at least single worker (normally no one is needed here +-- and we don't expect more than one worker. +CREATE TEMP TABLE test AS (SELECT x FROM generate_series(1,100) AS x); +VACUUM ANALYZE test; +SET max_parallel_workers_per_gather = 1; +SET write_page_cost = 0.0001; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size = 0; +SET min_parallel_index_scan_size = 0; +SET debug_parallel_query = 'on'; +-- Temp buffers will not be seen without flushing dirty buffers +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + QUERY PLAN +------------------------------------------------------------- + Gather (actual rows=100.00 loops=1) + Workers Planned: 1 + Workers Launched: 1 + -> Parallel Seq Scan on test (actual rows=50.00 loops=2) +(4 rows) + +-- Check temporary indexes too +CREATE INDEX idx1 ON test(x); +SET enable_seqscan = 'off'; +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + QUERY PLAN +------------------------------------------------------------------------------- + Gather (actual rows=100.00 loops=1) + Workers Planned: 1 + Workers Launched: 1 + -> Parallel Index Only Scan using idx1 on test (actual rows=50.00 loops=2) + Heap Fetches: 0 + Index Searches: 1 +(6 rows) + +RESET enable_seqscan; +-- a view doesn't have a storage - it shouldn't cause issues. +CREATE TEMP TABLE table_a (id integer); +CREATE TEMP VIEW view_a AS SELECT * FROM table_a; +SELECT view_a FROM view_a; + view_a +-------- +(0 rows) + +RESET debug_parallel_query; +RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; +RESET max_parallel_workers_per_gather; +RESET parallel_setup_cost; +RESET parallel_tuple_cost; diff --git a/src/test/regress/sql/temp.sql b/src/test/regress/sql/temp.sql index d50472ddced89..1abe7415ccd03 100644 --- a/src/test/regress/sql/temp.sql +++ b/src/test/regress/sql/temp.sql @@ -418,3 +418,43 @@ SELECT count(*), max(a) max_a, min(a) min_a, max(cnt) max_cnt FROM test_temp; -- cleanup DROP FUNCTION test_temp_pin(int, int); + +-- Test visibility of a temporary table tuples in parallel workers +-- Although explain prints the number of workers planned and launched, In this +-- case it shouldn't cause test results float because the debugging +-- option force usage of at least single worker (normally no one is needed here +-- and we don't expect more than one worker. +CREATE TEMP TABLE test AS (SELECT x FROM generate_series(1,100) AS x); +VACUUM ANALYZE test; + +SET max_parallel_workers_per_gather = 1; +SET write_page_cost = 0.0001; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size = 0; +SET min_parallel_index_scan_size = 0; +SET debug_parallel_query = 'on'; + +-- Temp buffers will not be seen without flushing dirty buffers +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + +-- Check temporary indexes too +CREATE INDEX idx1 ON test(x); +SET enable_seqscan = 'off'; +EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) +SELECT * FROM test; + +RESET enable_seqscan; + +-- a view doesn't have a storage - it shouldn't cause issues. +CREATE TEMP TABLE table_a (id integer); +CREATE TEMP VIEW view_a AS SELECT * FROM table_a; +SELECT view_a FROM view_a; + +RESET debug_parallel_query; +RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; +RESET max_parallel_workers_per_gather; +RESET parallel_setup_cost; +RESET parallel_tuple_cost;