-
Notifications
You must be signed in to change notification settings - Fork 45
SPOC-311: Fix inconsistent LSN tracking during subscription SYNC. #351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,9 @@ | |
| #include "utils/builtins.h" | ||
| #include "utils/fmgroids.h" | ||
| #include "utils/guc.h" | ||
| #if PG_VERSION_NUM >= 170000 | ||
| #include "utils/injection_point.h" | ||
| #endif | ||
| #include "utils/pg_lsn.h" | ||
| #include "utils/rel.h" | ||
| #include "utils/resowner.h" | ||
|
|
@@ -1203,13 +1206,54 @@ spock_sync_subscription(SpockSubscription *sub) | |
| origin_conn_repl = spock_connect_replica(sub->origin_if->dsn, | ||
| sub->name, "snap"); | ||
|
|
||
| progress_entries_list = adjust_progress_info(origin_conn); | ||
| snapshot = ensure_replication_slot_snapshot(origin_conn, | ||
| origin_conn_repl, | ||
| sub->slot_name, | ||
| use_failover_slot, &lsn); | ||
|
|
||
| PQfinish(origin_conn); | ||
| #if PG_VERSION_NUM >= 180000 | ||
| INJECTION_POINT("spock-before-sync-progress-read", "wait"); | ||
| #elif PG_VERSION_NUM >= 170000 | ||
| INJECTION_POINT("spock-before-sync-progress-read"); | ||
| #endif | ||
|
|
||
| /* | ||
| * Read progress info using the same snapshot that will be used for | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am very sorry, but I should confess that the phrase:
is totally unclear for me. We get LSN/TS values from the HTAB which is out of snapshot, right? |
||
| * COPY. This ensures that the LSN values we capture are consistent | ||
| * with the transactional snapshot: we see exactly the same committed | ||
| * transactions the COPY will include. Without this, transactions | ||
| * from other nodes could be committed between the progress read and | ||
| * the snapshot creation, causing the new node to either miss data | ||
| * or re-apply already-copied rows. | ||
| */ | ||
| { | ||
| PGresult *res; | ||
| PGresult *rollback_res; | ||
|
|
||
| PG_TRY(); | ||
| { | ||
| start_copy_origin_tx(origin_conn, snapshot); | ||
| progress_entries_list = adjust_progress_info(origin_conn); | ||
| res = PQexec(origin_conn, "ROLLBACK"); | ||
| if (PQresultStatus(res) != PGRES_COMMAND_OK) | ||
| elog(WARNING, "ROLLBACK on origin node failed: %s", | ||
| PQresultErrorMessage(res)); | ||
| PQclear(res); | ||
| } | ||
| PG_CATCH(); | ||
| { | ||
| rollback_res = PQexec(origin_conn, "ROLLBACK"); | ||
| if (PQresultStatus(rollback_res) != PGRES_COMMAND_OK) | ||
| elog(WARNING, "ROLLBACK on origin node failed: %s", | ||
| PQresultErrorMessage(rollback_res)); | ||
| PQclear(rollback_res); | ||
| PQfinish(origin_conn); | ||
| PG_RE_THROW(); | ||
| } | ||
| PG_END_TRY(); | ||
|
|
||
| PQfinish(origin_conn); | ||
| } | ||
|
|
||
| PG_ENSURE_ERROR_CLEANUP(spock_sync_worker_cleanup_error_cb, | ||
| PointerGetDatum(sub)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| use strict; | ||
| use warnings; | ||
| use Test::More tests => 34; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This commit doesn't change number of checks in the file (34). So, why it hides last four checks? |
||
| use Test::More tests => 30; | ||
|
ibrarahmad marked this conversation as resolved.
|
||
| use IPC::Run; | ||
| use lib '.'; | ||
| use lib 't'; | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,119 @@ | ||||||||||||||||||||||||||||||||||||||||||
| use strict; | ||||||||||||||||||||||||||||||||||||||||||
| use warnings; | ||||||||||||||||||||||||||||||||||||||||||
| use Test::More; | ||||||||||||||||||||||||||||||||||||||||||
| use IPC::Run; | ||||||||||||||||||||||||||||||||||||||||||
| use lib '.'; | ||||||||||||||||||||||||||||||||||||||||||
| use lib 't'; | ||||||||||||||||||||||||||||||||||||||||||
| use SpockTest qw(create_cluster destroy_cluster system_or_bail get_test_config cross_wire psql_or_bail scalar_query); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # ============================================================================= | ||||||||||||||||||||||||||||||||||||||||||
| # Test: Verify that progress info is read consistently with the COPY snapshot | ||||||||||||||||||||||||||||||||||||||||||
| # during subscription sync. | ||||||||||||||||||||||||||||||||||||||||||
| # | ||||||||||||||||||||||||||||||||||||||||||
| # Uses injection points to insert a delay between the replication slot | ||||||||||||||||||||||||||||||||||||||||||
| # snapshot creation and the progress read. During that delay, a DML from | ||||||||||||||||||||||||||||||||||||||||||
| # N2 arrives at N1. With the fix (progress read inside the snapshot), | ||||||||||||||||||||||||||||||||||||||||||
| # the progress entries will be consistent despite the delay. | ||||||||||||||||||||||||||||||||||||||||||
| # ============================================================================= | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| my $configure_options = `pg_config --configure`; | ||||||||||||||||||||||||||||||||||||||||||
| if ($configure_options !~ /enable-injection-points/) { | ||||||||||||||||||||||||||||||||||||||||||
| plan skip_all => 'Injection points not supported by this build'; | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| plan tests => 15; | ||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Count ok() / pass/fail assertions emitted by SpockTest helpers used in this file,
# and verify the total matches `plan tests => 15`.
# 1. How many ok() calls in this test file itself
echo "=== Explicit ok() calls in 018_inject_sync_progress.pl ==="
rg -n '\bok\s*\(' tests/tap/t/018_inject_sync_progress.pl
# 2. Find SpockTest module
echo -e "\n=== Finding SpockTest module ==="
fd -e pm | grep -i spocktest
# 3. Check what assertions SpockTest helpers emit
echo -e "\n=== Assertions inside create_cluster ==="
rg -n -A10 'sub create_cluster' tests/tap/SpockTest.pm || echo "Not found in default location"
echo -e "\n=== Assertions inside cross_wire ==="
rg -n -A10 'sub cross_wire' tests/tap/SpockTest.pm || echo "Not found in default location"
echo -e "\n=== Assertions inside destroy_cluster ==="
rg -n -A10 'sub destroy_cluster' tests/tap/SpockTest.pm || echo "Not found in default location"
echo -e "\n=== Total ok/pass/fail/is calls in SpockTest.pm ==="
rg -c '\b(?:ok|pass|fail|is|like|unlike|cmp_ok)\s*\(' tests/tap/SpockTest.pm || echo "File not found"Repository: pgEdge/spock Length of output: 1149 🏁 Script executed: #!/bin/bash
# Check the SpockTest helpers at the correct path
echo "=== Assertions inside create_cluster ==="
rg -n -A15 'sub create_cluster' tests/tap/t/SpockTest.pm
echo -e "\n=== Assertions inside cross_wire ==="
rg -n -A15 'sub cross_wire' tests/tap/t/SpockTest.pm
echo -e "\n=== Assertions inside destroy_cluster ==="
rg -n -A15 'sub destroy_cluster' tests/tap/t/SpockTest.pm
echo -e "\n=== Total assertion calls in SpockTest.pm ==="
rg '\b(?:ok|pass|fail|is|like|unlike|cmp_ok)\s*\(' tests/tap/t/SpockTest.pm | wc -lRepository: pgEdge/spock Length of output: 3174 🏁 Script executed: #!/bin/bash
# Check which SpockTest helpers are called in 018_inject_sync_progress.pl
echo "=== Calls to SpockTest helpers in 018_inject_sync_progress.pl ==="
rg -n 'create_cluster|cross_wire|destroy_cluster' tests/tap/t/018_inject_sync_progress.pl
# Get full content of 018_inject_sync_progress.pl to understand structure
echo -e "\n=== Full content of 018_inject_sync_progress.pl ==="
cat tests/tap/t/018_inject_sync_progress.pl
# Check all ok() calls in SpockTest.pm with context
echo -e "\n=== All assertion calls in SpockTest.pm with context ==="
rg -n -B2 -A2 '\b(?:ok|pass|fail)\s*\(' tests/tap/t/SpockTest.pmRepository: pgEdge/spock Length of output: 6974 Fix plan count mismatch. The plan declares 15 tests, but only 9 assertions are generated:
Update 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| create_cluster(3, 'Create initial 3-node Spock test cluster'); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| my ($ret1, $ret2, $ret3, $lsn1, $lsn2, $lsn3); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| my $config = get_test_config(); | ||||||||||||||||||||||||||||||||||||||||||
| my $node_count = $config->{node_count}; | ||||||||||||||||||||||||||||||||||||||||||
| my $node_ports = $config->{node_ports}; | ||||||||||||||||||||||||||||||||||||||||||
| my $host = $config->{host}; | ||||||||||||||||||||||||||||||||||||||||||
| my $dbname = $config->{db_name}; | ||||||||||||||||||||||||||||||||||||||||||
| my $db_user = $config->{db_user}; | ||||||||||||||||||||||||||||||||||||||||||
| my $db_password = $config->{db_password}; | ||||||||||||||||||||||||||||||||||||||||||
| my $pg_bin = $config->{pg_bin}; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| cross_wire(2, ['n1', 'n2'], 'Cross-wire nodes N1 and N2'); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| print STDERR "Install preparatory stuff and wait until it will be propagated\n"; | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "ALTER SYSTEM SET spock.exception_behaviour = 'sub_disable'"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "SELECT pg_reload_conf()"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "CREATE EXTENSION injection_points"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(1, "CREATE TABLE t1 (x bigint PRIMARY KEY)"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(1, "INSERT INTO t1 (x) VALUES (42)"); | ||||||||||||||||||||||||||||||||||||||||||
| $lsn1 = scalar_query(1, "SELECT spock.sync_event()"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(2, "CALL spock.wait_for_sync_event(true, 'n1', '$lsn1'::pg_lsn, 600)"); | ||||||||||||||||||||||||||||||||||||||||||
| print STDERR "---> LSN1: $lsn1\n"; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # Create N2 -> N3 disabled subscription and its slot manually. | ||||||||||||||||||||||||||||||||||||||||||
| # Then install an injection point on N3 to delay the N1 -> N3 sync. | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "SELECT spock.sub_create(subscription_name := 'n2_n3', | ||||||||||||||||||||||||||||||||||||||||||
| provider_dsn := 'host=$host dbname=$dbname port=$node_ports->[1] user=$db_user password=$db_password', | ||||||||||||||||||||||||||||||||||||||||||
| enabled := false);"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(2, "SELECT 1 FROM pg_create_logical_replication_slot( | ||||||||||||||||||||||||||||||||||||||||||
| 'spk_${dbname}_n2_n2_n3', 'spock_output')"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "SELECT injection_points_attach( | ||||||||||||||||||||||||||||||||||||||||||
| 'spock-before-sync-progress-read', 'wait')"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "SELECT spock.sub_create(subscription_name := 'n1_n3', | ||||||||||||||||||||||||||||||||||||||||||
| provider_dsn := 'host=$host dbname=$dbname port=$node_ports->[0] user=$db_user password=$db_password', | ||||||||||||||||||||||||||||||||||||||||||
| synchronize_structure := true, synchronize_data := true, enabled := true);"); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # Wait until the sync worker on N3 has reached the injection point (poll | ||||||||||||||||||||||||||||||||||||||||||
| # pg_stat_activity for the InjectionPoint wait event), with up to 60s timeout. | ||||||||||||||||||||||||||||||||||||||||||
| my $waited = 0; | ||||||||||||||||||||||||||||||||||||||||||
| while ($waited < 60) { | ||||||||||||||||||||||||||||||||||||||||||
| my $hit = scalar_query(3, | ||||||||||||||||||||||||||||||||||||||||||
| "SELECT 1 FROM pg_stat_activity WHERE wait_event = 'InjectionPoint' AND datname = current_database()"); | ||||||||||||||||||||||||||||||||||||||||||
| last if ($hit eq '1'); | ||||||||||||||||||||||||||||||||||||||||||
| sleep 1; | ||||||||||||||||||||||||||||||||||||||||||
| $waited++; | ||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||
| ok($waited < 60, "Sync worker reached injection point within 60s"); | ||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+66
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In
🐛 Proposed fix- my $hit = scalar_query(3,
- "SELECT 1 FROM pg_stat_activity WHERE wait_event = 'InjectionPoint' AND datname = current_database()");
+ my $hit = scalar_query(3,
+ "SELECT 1 FROM pg_stat_activity WHERE wait_event_type = 'InjectionPoint'"
+ . " AND wait_event = 'spock-before-sync-progress-read'"
+ . " AND datname = current_database()");📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # While N3 is paused at the injection point (after snapshot, before progress | ||||||||||||||||||||||||||||||||||||||||||
| # read), update data on N2. This will be replicated to N1 and included in the | ||||||||||||||||||||||||||||||||||||||||||
| # COPY snapshot. With the fix, the progress read also uses the same snapshot, | ||||||||||||||||||||||||||||||||||||||||||
| # so the LSN will correctly reflect this transaction. | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(2, "UPDATE t1 SET x = x + 1"); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| $lsn2 = scalar_query(2, "SELECT spock.sync_event()"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(1, "CALL spock.wait_for_sync_event(true, 'n2', '$lsn2'::pg_lsn, 600)"); | ||||||||||||||||||||||||||||||||||||||||||
| print STDERR "---> LSN2: $lsn2\n"; | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # Wake up N1 -> N3 subscription and wait until it becomes ready | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "SELECT injection_points_wakeup('spock-before-sync-progress-read');"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "SELECT injection_points_detach('spock-before-sync-progress-read');"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(1, "SELECT spock.wait_slot_confirm_lsn(NULL, NULL)"); | ||||||||||||||||||||||||||||||||||||||||||
| $lsn1 = scalar_query(1, "SELECT spock.sync_event()"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "CALL spock.wait_for_sync_event(true, 'n1', '$lsn1'::pg_lsn, 600)"); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # Advance the N2->N3 slot using the progress LSN from N3, then enable the sub | ||||||||||||||||||||||||||||||||||||||||||
| $lsn3 = scalar_query(3, "SELECT remote_commit_lsn FROM spock.progress p | ||||||||||||||||||||||||||||||||||||||||||
| JOIN spock.node n ON (p.remote_node_id = n.node_id) | ||||||||||||||||||||||||||||||||||||||||||
| WHERE n.node_name = 'n2';"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(2, "SELECT pg_replication_slot_advance('spk_${dbname}_n2_n2_n3', '$lsn3'::pg_lsn)"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "SELECT spock.sub_enable('n2_n3', immediate := false)"); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| $lsn2 = scalar_query(2, "SELECT spock.sync_event()"); | ||||||||||||||||||||||||||||||||||||||||||
| psql_or_bail(3, "CALL spock.wait_for_sync_event(true, 'n2', '$lsn2'::pg_lsn, 600)"); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| print STDERR "Check aggregates on all of the nodes\n"; | ||||||||||||||||||||||||||||||||||||||||||
| $ret1 = scalar_query(1, "SELECT sum(x), count(*) FROM t1"); | ||||||||||||||||||||||||||||||||||||||||||
| $ret2 = scalar_query(2, "SELECT sum(x), count(*) FROM t1"); | ||||||||||||||||||||||||||||||||||||||||||
| $ret3 = scalar_query(3, "SELECT sum(x), count(*) FROM t1"); | ||||||||||||||||||||||||||||||||||||||||||
| print STDERR "Aggregates: N1=$ret1 | N2=$ret2 | N3=$ret3\n"; | ||||||||||||||||||||||||||||||||||||||||||
| ok($ret1 eq $ret2, "Equality of the data on N1 and N2 is confirmed"); | ||||||||||||||||||||||||||||||||||||||||||
| ok($ret1 eq $ret3, "Equality of the data on N1 and N3 is confirmed"); | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| print STDERR "Check that all existing subscriptions are enabled\n"; | ||||||||||||||||||||||||||||||||||||||||||
| $ret1 = scalar_query(1, "SELECT count(*) FROM spock.subscription WHERE sub_enabled = false;"); | ||||||||||||||||||||||||||||||||||||||||||
| $ret2 = scalar_query(2, "SELECT count(*) FROM spock.subscription WHERE sub_enabled = false;"); | ||||||||||||||||||||||||||||||||||||||||||
| $ret3 = scalar_query(3, "SELECT count(*) FROM spock.subscription WHERE sub_enabled = false;"); | ||||||||||||||||||||||||||||||||||||||||||
| ok($ret1 eq '0', "All subscriptions on the node N1 are active"); | ||||||||||||||||||||||||||||||||||||||||||
| ok($ret2 eq '0', "All subscriptions on the node N2 are active"); | ||||||||||||||||||||||||||||||||||||||||||
| ok($ret3 eq '0', "All subscriptions on the node N3 are active"); | ||||||||||||||||||||||||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| destroy_cluster('Destroy 3-node cluster'); | ||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.