Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions src/spock_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 170000
#include "utils/injection_point.h"
Comment thread
coderabbitai[bot] marked this conversation as resolved.
#endif
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/resowner.h"
Expand Down Expand Up @@ -1203,13 +1206,54 @@ spock_sync_subscription(SpockSubscription *sub)
origin_conn_repl = spock_connect_replica(sub->origin_if->dsn,
sub->name, "snap");

progress_entries_list = adjust_progress_info(origin_conn);
snapshot = ensure_replication_slot_snapshot(origin_conn,
origin_conn_repl,
sub->slot_name,
use_failover_slot, &lsn);

PQfinish(origin_conn);
#if PG_VERSION_NUM >= 180000
INJECTION_POINT("spock-before-sync-progress-read", "wait");
#elif PG_VERSION_NUM >= 170000
INJECTION_POINT("spock-before-sync-progress-read");
#endif

/*
* Read progress info using the same snapshot that will be used for
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very sorry, but I should confess that the phrase:

Read progress info using the same snapshot

is totally unclear for me. We get LSN/TS values from the HTAB which is out of snapshot, right?
So, if in between the calls of 'ensure_replication_slot_snapshot' and 'adjust_progress_info' some remote transactions will be applied by an apply worker we will have LSN_X of the commit that is not in the snapshot. Having starting replication from this LSN_X we loose changes made in between COPY Snapshot and this LSN, isn't it?

* COPY. This ensures that the LSN values we capture are consistent
* with the transactional snapshot: we see exactly the same committed
* transactions the COPY will include. Without this, transactions
* from other nodes could be committed between the progress read and
* the snapshot creation, causing the new node to either miss data
* or re-apply already-copied rows.
*/
{
PGresult *res;
PGresult *rollback_res;

PG_TRY();
{
start_copy_origin_tx(origin_conn, snapshot);
progress_entries_list = adjust_progress_info(origin_conn);
res = PQexec(origin_conn, "ROLLBACK");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
elog(WARNING, "ROLLBACK on origin node failed: %s",
PQresultErrorMessage(res));
PQclear(res);
}
PG_CATCH();
{
rollback_res = PQexec(origin_conn, "ROLLBACK");
if (PQresultStatus(rollback_res) != PGRES_COMMAND_OK)
elog(WARNING, "ROLLBACK on origin node failed: %s",
PQresultErrorMessage(rollback_res));
PQclear(rollback_res);
PQfinish(origin_conn);
PG_RE_THROW();
}
PG_END_TRY();

PQfinish(origin_conn);
}

PG_ENSURE_ERROR_CLEANUP(spock_sync_worker_cleanup_error_cb,
PointerGetDatum(sub));
Expand Down
1 change: 1 addition & 0 deletions tests/tap/schedule
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ test: 015_skip_lsn
test: 015_forward_origin_advance
test: 016_crash_recovery_progress
test: 017_zodan_3n_timeout
test: 018_inject_sync_progress
2 changes: 1 addition & 1 deletion tests/tap/t/011_zodan_sync_third.pl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use strict;
use warnings;
use Test::More tests => 34;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit doesn't change number of checks in the file (34). So, why it hides last four checks?

use Test::More tests => 30;
Comment thread
ibrarahmad marked this conversation as resolved.
use IPC::Run;
use lib '.';
use lib 't';
Expand Down
119 changes: 119 additions & 0 deletions tests/tap/t/018_inject_sync_progress.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use strict;
use warnings;
use Test::More;
use IPC::Run;
use lib '.';
use lib 't';
use SpockTest qw(create_cluster destroy_cluster system_or_bail get_test_config cross_wire psql_or_bail scalar_query);

# =============================================================================
# Test: Verify that progress info is read consistently with the COPY snapshot
# during subscription sync.
#
# Uses injection points to insert a delay between the replication slot
# snapshot creation and the progress read. During that delay, a DML from
# N2 arrives at N1. With the fix (progress read inside the snapshot),
# the progress entries will be consistent despite the delay.
# =============================================================================

my $configure_options = `pg_config --configure`;
if ($configure_options !~ /enable-injection-points/) {
plan skip_all => 'Injection points not supported by this build';
}

plan tests => 15;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Count ok() / pass/fail assertions emitted by SpockTest helpers used in this file,
# and verify the total matches `plan tests => 15`.

# 1. How many ok() calls in this test file itself
echo "=== Explicit ok() calls in 018_inject_sync_progress.pl ==="
rg -n '\bok\s*\(' tests/tap/t/018_inject_sync_progress.pl

# 2. Find SpockTest module
echo -e "\n=== Finding SpockTest module ==="
fd -e pm | grep -i spocktest

# 3. Check what assertions SpockTest helpers emit
echo -e "\n=== Assertions inside create_cluster ==="
rg -n -A10 'sub create_cluster' tests/tap/SpockTest.pm || echo "Not found in default location"

echo -e "\n=== Assertions inside cross_wire ==="
rg -n -A10 'sub cross_wire' tests/tap/SpockTest.pm || echo "Not found in default location"

echo -e "\n=== Assertions inside destroy_cluster ==="
rg -n -A10 'sub destroy_cluster' tests/tap/SpockTest.pm || echo "Not found in default location"

echo -e "\n=== Total ok/pass/fail/is calls in SpockTest.pm ==="
rg -c '\b(?:ok|pass|fail|is|like|unlike|cmp_ok)\s*\(' tests/tap/SpockTest.pm || echo "File not found"

Repository: pgEdge/spock

Length of output: 1149


🏁 Script executed:

#!/bin/bash
# Check the SpockTest helpers at the correct path
echo "=== Assertions inside create_cluster ==="
rg -n -A15 'sub create_cluster' tests/tap/t/SpockTest.pm

echo -e "\n=== Assertions inside cross_wire ==="
rg -n -A15 'sub cross_wire' tests/tap/t/SpockTest.pm

echo -e "\n=== Assertions inside destroy_cluster ==="
rg -n -A15 'sub destroy_cluster' tests/tap/t/SpockTest.pm

echo -e "\n=== Total assertion calls in SpockTest.pm ==="
rg '\b(?:ok|pass|fail|is|like|unlike|cmp_ok)\s*\(' tests/tap/t/SpockTest.pm | wc -l

Repository: pgEdge/spock

Length of output: 3174


🏁 Script executed:

#!/bin/bash
# Check which SpockTest helpers are called in 018_inject_sync_progress.pl
echo "=== Calls to SpockTest helpers in 018_inject_sync_progress.pl ==="
rg -n 'create_cluster|cross_wire|destroy_cluster' tests/tap/t/018_inject_sync_progress.pl

# Get full content of 018_inject_sync_progress.pl to understand structure
echo -e "\n=== Full content of 018_inject_sync_progress.pl ==="
cat tests/tap/t/018_inject_sync_progress.pl

# Check all ok() calls in SpockTest.pm with context
echo -e "\n=== All assertion calls in SpockTest.pm with context ==="
rg -n -B2 -A2 '\b(?:ok|pass|fail)\s*\(' tests/tap/t/SpockTest.pm

Repository: pgEdge/spock

Length of output: 6974


Fix plan count mismatch.

The plan declares 15 tests, but only 9 assertions are generated:

  • 6 explicit ok() calls (lines 74, 108, 109, 115, 116, 117)
  • 3 pass() calls from SpockTest helpers:
    • create_cluster() emits 1 assertion
    • cross_wire() emits 1 assertion
    • destroy_cluster() emits 1 assertion

Update plan tests => 15; to plan tests => 9; or add 6 more assertions to match the declared count.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/tap/t/018_inject_sync_progress.pl` at line 24, The test plan declares
15 tests but only yields 9 assertions; update the Plan or add assertions: either
change the "plan tests => 15;" line to "plan tests => 9;" to match the actual
assertions, or add six additional assertions (e.g., more ok() calls or explicit
pass() checks) so the total equals 15; ensure any added assertions are placed
where appropriate relative to the existing ok() calls and helper-generated
assertions from create_cluster(), cross_wire(), and destroy_cluster().


create_cluster(3, 'Create initial 3-node Spock test cluster');

my ($ret1, $ret2, $ret3, $lsn1, $lsn2, $lsn3);

my $config = get_test_config();
my $node_count = $config->{node_count};
my $node_ports = $config->{node_ports};
my $host = $config->{host};
my $dbname = $config->{db_name};
my $db_user = $config->{db_user};
my $db_password = $config->{db_password};
my $pg_bin = $config->{pg_bin};

cross_wire(2, ['n1', 'n2'], 'Cross-wire nodes N1 and N2');

print STDERR "Install preparatory stuff and wait until it will be propagated\n";
psql_or_bail(3, "ALTER SYSTEM SET spock.exception_behaviour = 'sub_disable'");
psql_or_bail(3, "SELECT pg_reload_conf()");
psql_or_bail(3, "CREATE EXTENSION injection_points");
psql_or_bail(1, "CREATE TABLE t1 (x bigint PRIMARY KEY)");
psql_or_bail(1, "INSERT INTO t1 (x) VALUES (42)");
$lsn1 = scalar_query(1, "SELECT spock.sync_event()");
psql_or_bail(2, "CALL spock.wait_for_sync_event(true, 'n1', '$lsn1'::pg_lsn, 600)");
print STDERR "---> LSN1: $lsn1\n";

# Create N2 -> N3 disabled subscription and its slot manually.
# Then install an injection point on N3 to delay the N1 -> N3 sync.
psql_or_bail(3, "SELECT spock.sub_create(subscription_name := 'n2_n3',
provider_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password',
enabled := false);");
psql_or_bail(2, "SELECT 1 FROM pg_create_logical_replication_slot(
'spk_${dbname}_n2_n2_n3', 'spock_output')");
psql_or_bail(3, "SELECT injection_points_attach(
'spock-before-sync-progress-read', 'wait')");
psql_or_bail(3, "SELECT spock.sub_create(subscription_name := 'n1_n3',
provider_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password',
synchronize_structure := true, synchronize_data := true, enabled := true);");

# Wait until the sync worker on N3 has reached the injection point (poll
# pg_stat_activity for the InjectionPoint wait event), with up to 60s timeout.
my $waited = 0;
while ($waited < 60) {
my $hit = scalar_query(3,
"SELECT 1 FROM pg_stat_activity WHERE wait_event = 'InjectionPoint' AND datname = current_database()");
last if ($hit eq '1');
sleep 1;
$waited++;
}
ok($waited < 60, "Sync worker reached injection point within 60s");
Comment on lines +66 to +74
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

wait_event = 'InjectionPoint' is the wrong column — polling loop will always time out.

In pg_stat_activity, wait_event_type is the wait category and wait_event is the specific reason a client is waiting for. When a backend is blocked at an injection point named spock-before-sync-progress-read, PostgreSQL sets wait_event_type = 'InjectionPoint' and wait_event = 'spock-before-sync-progress-read'. The condition on line 69 tests wait_event = 'InjectionPoint', which will never match any injection-point wait — it would only match if there were a point literally named 'InjectionPoint'. As a result, $waited always reaches 60, so:

  1. ok($waited < 60, ...) at line 74 always emits not ok.
  2. The 60-second sleep lets the sync worker advance past the injection point before the UPDATE on line 80, making the test vacuous.
🐛 Proposed fix
-    my $hit = scalar_query(3,
-        "SELECT 1 FROM pg_stat_activity WHERE wait_event = 'InjectionPoint' AND datname = current_database()");
+    my $hit = scalar_query(3,
+        "SELECT 1 FROM pg_stat_activity WHERE wait_event_type = 'InjectionPoint'"
+        . " AND wait_event = 'spock-before-sync-progress-read'"
+        . " AND datname = current_database()");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
my $waited = 0;
while ($waited < 60) {
my $hit = scalar_query(3,
"SELECT 1 FROM pg_stat_activity WHERE wait_event = 'InjectionPoint' AND datname = current_database()");
last if ($hit eq '1');
sleep 1;
$waited++;
}
ok($waited < 60, "Sync worker reached injection point within 60s");
my $waited = 0;
while ($waited < 60) {
my $hit = scalar_query(3,
"SELECT 1 FROM pg_stat_activity WHERE wait_event_type = 'InjectionPoint'"
. " AND wait_event = 'spock-before-sync-progress-read'"
. " AND datname = current_database()");
last if ($hit eq '1');
sleep 1;
$waited++;
}
ok($waited < 60, "Sync worker reached injection point within 60s");
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/tap/t/018_inject_sync_progress.pl` around lines 66 - 74, The SQL check
in the polling loop uses the wrong column: change the scalar_query call inside
the while loop (the SELECT that currently tests wait_event = 'InjectionPoint')
to instead check wait_event_type = 'InjectionPoint' and wait_event =
'spock-before-sync-progress-read' so the loop actually detects the backend
blocked at the named injection point; keep the loop structure and $waited
variable as-is and only update the WHERE clause in the scalar_query invocation
so ok($waited < 60, ...) behaves as intended.


# While N3 is paused at the injection point (after snapshot, before progress
# read), update data on N2. This will be replicated to N1 and included in the
# COPY snapshot. With the fix, the progress read also uses the same snapshot,
# so the LSN will correctly reflect this transaction.
psql_or_bail(2, "UPDATE t1 SET x = x + 1");

$lsn2 = scalar_query(2, "SELECT spock.sync_event()");
psql_or_bail(1, "CALL spock.wait_for_sync_event(true, 'n2', '$lsn2'::pg_lsn, 600)");
print STDERR "---> LSN2: $lsn2\n";

# Wake up N1 -> N3 subscription and wait until it becomes ready
psql_or_bail(3, "SELECT injection_points_wakeup('spock-before-sync-progress-read');");
psql_or_bail(3, "SELECT injection_points_detach('spock-before-sync-progress-read');");
psql_or_bail(1, "SELECT spock.wait_slot_confirm_lsn(NULL, NULL)");
$lsn1 = scalar_query(1, "SELECT spock.sync_event()");
psql_or_bail(3, "CALL spock.wait_for_sync_event(true, 'n1', '$lsn1'::pg_lsn, 600)");

# Advance the N2->N3 slot using the progress LSN from N3, then enable the sub
$lsn3 = scalar_query(3, "SELECT remote_commit_lsn FROM spock.progress p
JOIN spock.node n ON (p.remote_node_id = n.node_id)
WHERE n.node_name = 'n2';");
psql_or_bail(2, "SELECT pg_replication_slot_advance('spk_${dbname}_n2_n2_n3', '$lsn3'::pg_lsn)");
psql_or_bail(3, "SELECT spock.sub_enable('n2_n3', immediate := false)");

$lsn2 = scalar_query(2, "SELECT spock.sync_event()");
psql_or_bail(3, "CALL spock.wait_for_sync_event(true, 'n2', '$lsn2'::pg_lsn, 600)");

print STDERR "Check aggregates on all of the nodes\n";
$ret1 = scalar_query(1, "SELECT sum(x), count(*) FROM t1");
$ret2 = scalar_query(2, "SELECT sum(x), count(*) FROM t1");
$ret3 = scalar_query(3, "SELECT sum(x), count(*) FROM t1");
print STDERR "Aggregates: N1=$ret1 | N2=$ret2 | N3=$ret3\n";
ok($ret1 eq $ret2, "Equality of the data on N1 and N2 is confirmed");
ok($ret1 eq $ret3, "Equality of the data on N1 and N3 is confirmed");

print STDERR "Check that all existing subscriptions are enabled\n";
$ret1 = scalar_query(1, "SELECT count(*) FROM spock.subscription WHERE sub_enabled = false;");
$ret2 = scalar_query(2, "SELECT count(*) FROM spock.subscription WHERE sub_enabled = false;");
$ret3 = scalar_query(3, "SELECT count(*) FROM spock.subscription WHERE sub_enabled = false;");
ok($ret1 eq '0', "All subscriptions on the node N1 are active");
ok($ret2 eq '0', "All subscriptions on the node N2 are active");
ok($ret3 eq '0', "All subscriptions on the node N3 are active");
Comment thread
coderabbitai[bot] marked this conversation as resolved.

destroy_cluster('Destroy 3-node cluster');