Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
name: PR regression tests (REL_18_STABLE)

on:
pull_request:
branches:
- REL_18_STABLE

jobs:
build-and-test:
runs-on: ubuntu-latest

steps:
- name: Checkout PR branch
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Install build dependencies
run: |
sudo apt-get update -qq
sudo apt-get install -y -qq \
gcc make bison flex \
libreadline-dev zlib1g-dev libssl-dev libicu-dev \
liblz4-dev libzstd-dev \
pkg-config \
perl libipc-run-perl \
python3

- name: Configure
run: |
./configure \
--enable-cassert --enable-debug --enable-tap-tests \
--with-openssl --with-icu --with-lz4 --with-zstd

- name: Build
run: make -j$(nproc) world-bin

- name: Run check-world
run: make check-world

- name: Collect regression artifacts on failure
if: failure()
run: |
mkdir -p /tmp/regress-artifacts
find . \( -name regression.diffs -o -name regression.out \
-o -name postmaster.log -o -name initdb.log \
-o -path '*/tmp_check/log/*' \) \
-exec cp --parents {} /tmp/regress-artifacts/ \;

- name: Upload regression artifacts
if: failure()
uses: actions/upload-artifact@v4
with:
name: regression-artifacts
path: /tmp/regress-artifacts/
if-no-files-found: ignore

- name: Job summary
if: always()
run: |
if [ "${{ job.status }}" = "success" ]; then
echo "## ✅ check-world passed" >> "$GITHUB_STEP_SUMMARY"
else
echo "## ❌ check-world failed" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
echo "Download the **regression-artifacts** artifact for details." >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
# Inline first diffs found, if any
for f in $(find . -name regression.diffs 2>/dev/null | head -5); do
echo "### ${f}" >> "$GITHUB_STEP_SUMMARY"
echo '```' >> "$GITHUB_STEP_SUMMARY"
head -100 "$f" >> "$GITHUB_STEP_SUMMARY"
echo '```' >> "$GITHUB_STEP_SUMMARY"
done
fi
4 changes: 4 additions & 0 deletions src/backend/executor/execParallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "jit/jit.h"
#include "nodes/nodeFuncs.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "tcop/tcopprot.h"
#include "utils/datum.h"
#include "utils/dsa.h"
Expand Down Expand Up @@ -77,6 +78,7 @@ typedef struct FixedParallelExecutorState
dsa_pointer param_exec;
int eflags;
int jit_flags;
int dirtied_localbufs; /* Just for debugging purposes */
} FixedParallelExecutorState;

/*
Expand Down Expand Up @@ -756,6 +758,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
fpes->param_exec = InvalidDsaPointer;
fpes->eflags = estate->es_top_eflags;
fpes->jit_flags = estate->es_jit_flags;
fpes->dirtied_localbufs = dirtied_localbufs;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);

/* Store query string */
Expand Down Expand Up @@ -1442,6 +1445,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)

/* Get fixed-size state. */
fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
dirtied_localbufs = fpes->dirtied_localbufs;

/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc);
Expand Down
1 change: 1 addition & 0 deletions src/backend/executor/execUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ CreateExecutorState(void)
estate->es_use_parallel_mode = false;
estate->es_parallel_workers_to_launch = 0;
estate->es_parallel_workers_launched = 0;
estate->es_tempbufs_flushed = false; /* Is the backend's temp buffers were flushed? */

estate->es_jit_flags = 0;
estate->es_jit = NULL;
Expand Down
12 changes: 12 additions & 0 deletions src/backend/executor/nodeGather.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "storage/bufmgr.h"
#include "utils/wait_event.h"


Expand Down Expand Up @@ -161,6 +162,17 @@ ExecGather(PlanState *pstate)
{
ParallelContext *pcxt;

/*
* Flush temporary buffers if this parallel section contains
* any objects with temporary storage type. Don't bother to do it
* more than once per the query execution.
*/
if (gather->process_temp_tables && !estate->es_tempbufs_flushed)
{
FlushAllBuffers();
estate->es_tempbufs_flushed = true;
}

/* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei)
node->pei = ExecInitParallelPlan(outerPlanState(node),
Expand Down
8 changes: 8 additions & 0 deletions src/backend/executor/nodeGatherMerge.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "storage/bufmgr.h"

/*
* When we read tuples from workers, it's a good idea to read several at once
Expand Down Expand Up @@ -205,6 +206,13 @@ ExecGatherMerge(PlanState *pstate)
{
ParallelContext *pcxt;

/* The same as in the ExecGather */
if (gm->process_temp_tables && !estate->es_tempbufs_flushed)
{
FlushAllBuffers();
estate->es_tempbufs_flushed = true;
}

/* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei)
node->pei = ExecInitParallelPlan(outerPlanState(node),
Expand Down
30 changes: 16 additions & 14 deletions src/backend/optimizer/path/allpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -601,23 +601,25 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
/* This should only be called for baserels and appendrel children. */
Assert(IS_SIMPLE_REL(rel));

/* Set if the data source refers temp storage somehow */
rel->needs_temp_safety = false;

/* Assorted checks based on rtekind. */
switch (rte->rtekind)
{
case RTE_RELATION:

/*
* Currently, parallel workers can't access the leader's temporary
* tables. We could possibly relax this if we wrote all of its
* local buffers at the start of the query and made no changes
* thereafter (maybe we could allow hint bit changes), and if we
* taught the workers to read them. Writing a large number of
* temporary buffers could be expensive, though, and we don't have
* the rest of the necessary infrastructure right now anyway. So
* for now, bail out if we see a temporary table.
* It is not free to process objects with a temporary storage in
* parallel because we need to flush temporary buffers beforehand.
* So, hide this feature under a GUC.
*/
if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP)
return;
{
if (!extended_parallel_processing)
return;
rel->needs_temp_safety = true;
}

/*
* Table sampling can be pushed down to workers if the sample
Expand All @@ -629,7 +631,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,

if (proparallel != PROPARALLEL_SAFE)
return;
if (!is_parallel_safe(root, (Node *) rte->tablesample->args))
if (!is_parallel_safe(root, (Node *) rte->tablesample->args, &rel->needs_temp_safety))
return;
}

Expand Down Expand Up @@ -695,7 +697,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,

case RTE_FUNCTION:
/* Check for parallel-restricted functions. */
if (!is_parallel_safe(root, (Node *) rte->functions))
if (!is_parallel_safe(root, (Node *) rte->functions, &rel->needs_temp_safety))
return;
break;

Expand All @@ -705,7 +707,7 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,

case RTE_VALUES:
/* Check for parallel-restricted functions. */
if (!is_parallel_safe(root, (Node *) rte->values_lists))
if (!is_parallel_safe(root, (Node *) rte->values_lists, &rel->needs_temp_safety))
return;
break;

Expand Down Expand Up @@ -746,14 +748,14 @@ set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
* outer join clauses work correctly. It would likely break equivalence
* classes, too.
*/
if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo))
if (!is_parallel_safe(root, (Node *) rel->baserestrictinfo, &rel->needs_temp_safety))
return;

/*
* Likewise, if the relation's outputs are not parallel-safe, give up.
* (Usually, they're just Vars, but sometimes they're not.)
*/
if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs))
if (!is_parallel_safe(root, (Node *) rel->reltarget->exprs, &rel->needs_temp_safety))
return;

/* We have a winner. */
Expand Down
46 changes: 44 additions & 2 deletions src/backend/optimizer/path/costsize.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
#include "optimizer/plancat.h"
#include "optimizer/restrictinfo.h"
#include "parser/parsetree.h"
#include "storage/bufmgr.h"
#include "utils/lsyscache.h"
#include "utils/selfuncs.h"
#include "utils/spccache.h"
Expand All @@ -129,6 +130,7 @@

double seq_page_cost = DEFAULT_SEQ_PAGE_COST;
double random_page_cost = DEFAULT_RANDOM_PAGE_COST;
double write_page_cost = DEFAULT_WRITE_PAGE_COST;
double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST;
double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST;
double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST;
Expand Down Expand Up @@ -164,6 +166,8 @@ bool enable_partition_pruning = true;
bool enable_presorted_aggregate = true;
bool enable_async_append = true;

bool extended_parallel_processing = true;

typedef struct
{
PlannerInfo *root;
Expand Down Expand Up @@ -462,6 +466,10 @@ cost_gather(GatherPath *path, PlannerInfo *root,

run_cost = path->subpath->total_cost - path->subpath->startup_cost;

/* Account for flushing dirty temp buffers before launching workers */
if (path->subpath->parallel_safe == NEEDS_TEMP_FLUSH)
startup_cost += tempbuf_flush_extra_cost();

/* Parallel setup and communication cost. */
startup_cost += parallel_setup_cost;
run_cost += parallel_tuple_cost * path->path.rows;
Expand Down Expand Up @@ -532,10 +540,22 @@ cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
startup_cost += parallel_setup_cost;
run_cost += parallel_tuple_cost * path->path.rows * 1.05;

/*
* Fold in the input (subpath) costs, following the same pattern as
* cost_gather: input startup goes into startup_cost, the incremental
* portion goes into run_cost, so total = startup + run naturally.
*/
startup_cost += input_startup_cost;
run_cost += input_total_cost - input_startup_cost;

/* Account for flushing dirty temp buffers before launching workers */
if (path->subpath->parallel_safe == NEEDS_TEMP_FLUSH)
startup_cost += tempbuf_flush_extra_cost();

path->path.disabled_nodes = input_disabled_nodes
+ (enable_gathermerge ? 0 : 1);
path->path.startup_cost = startup_cost + input_startup_cost;
path->path.total_cost = (startup_cost + run_cost + input_total_cost);
path->path.startup_cost = startup_cost;
path->path.total_cost = (startup_cost + run_cost);
}

/*
Expand Down Expand Up @@ -6628,3 +6648,25 @@ compute_gather_rows(Path *path)

return clamp_row_est(path->rows * get_parallel_divisor(path));
}

/*
* Before the launch parallel workers in a SELECT query, the leader process must
* flush all dirty pages in temp buffers to guarantee equal access to the data
* in each parallel worker.
* It seems difficult to calculate specific set of tables, indexes and toasts
* that may be touched inside the subtree. Moreover, stored procedures may also
* scan temporary tables. So, it makes sense to flush all temporary buffers.
* Here we calculate the cost of such operation to allow small queries do not
* activate expensive parallel scan over temp resources.
*/
Cost
tempbuf_flush_extra_cost()
{
if (!extended_parallel_processing)
/* Fast exit if feature is disabled */
return 0.0;

/* Hopefully, we have an statistics on the number of dirtied buffers */
Assert(dirtied_localbufs >= 0);
return write_page_cost * dirtied_localbufs;
}
9 changes: 7 additions & 2 deletions src/backend/optimizer/path/equivclass.c
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ find_computable_ec_member(PlannerInfo *root,
{
List *emvars;
ListCell *lc2;
bool needs_temp_flush = false;

/*
* We shouldn't be trying to sort by an equivalence class that
Expand Down Expand Up @@ -1049,9 +1050,11 @@ find_computable_ec_member(PlannerInfo *root,
/*
* If requested, reject expressions that are not parallel-safe. We
* check this last because it's a rather expensive test.
* TODO: Not sure if it is really necessary.
*/
if (require_parallel_safe &&
!is_parallel_safe(root, (Node *) em->em_expr))
(!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) ||
needs_temp_flush))
continue;

return em; /* found usable expression */
Expand Down Expand Up @@ -1093,6 +1096,7 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel,
foreach(lc, target->exprs)
{
Expr *targetexpr = (Expr *) lfirst(lc);
bool needs_temp_flush = false;

em = find_ec_member_matching_expr(ec, targetexpr, rel->relids);
if (!em)
Expand All @@ -1112,7 +1116,8 @@ relation_can_be_sorted_early(PlannerInfo *root, RelOptInfo *rel,
* check this last because it's a rather expensive test.
*/
if (require_parallel_safe &&
!is_parallel_safe(root, (Node *) em->em_expr))
(!is_parallel_safe(root, (Node *) em->em_expr, &needs_temp_flush) ||
needs_temp_flush))
continue;

return true;
Expand Down
Loading
Loading