From 66bde0fbcee53c4c690bc6fd82ab4bd83288f3db Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Thu, 19 Feb 2026 13:37:14 +0100 Subject: [PATCH 1/7] Add per-subscription conflict statistics using PG18 custom pgstat kind Register a custom pgstat kind (SPOCK_PGSTAT_KIND_LRCONFLICTS) via the PG18 pgstat_register_kind() infrastructure to track replication conflict counts per subscription. Unlike vanilla PostgreSQL subscription stats, spock statistics use MyDatabaseId as the key because spock node, origin, and subscription IDs are unique only within a database. The implementation introduces spock-specific stat types (Spock_Stat_StatSubEntry, Spock_Stat_PendingSubEntry) sized to SPOCK_CONFLICT_NUM_TYPES (6 types, excluding SPOCK_CT_DELETE_LATE which is not yet tracked). Column names in the SQL-callable function are kept in sync with the SpockConflictType enum via designated initializers in SpockConflictStatColNames[]. Currently only SPOCK_CT_UPDATE_MISSING conflicts are actively reported (from spock_apply_heap.c). Reporting from spock_report_conflict() is left as a TODO until SPOCK_CT_DELETE_LATE is either included in the stats array or filtered out to prevent overflow. SQL functions: spock.get_subscription_stats(oid), spock.reset_subscription_stats(oid). Includes a regression test exercising UPDATE_MISSING counting, counter increment, and stats reset. --- Makefile | 8 +- include/spock_conflict.h | 8 +- include/spock_conflict_stat.h | 46 ++++ sql/spock--6.0.0-devel.sql | 23 ++ src/spock.c | 8 + src/spock_apply_heap.c | 12 ++ src/spock_conflict.c | 15 ++ src/spock_conflict_stat.c | 255 +++++++++++++++++++++++ src/spock_functions.c | 9 + tests/regress/expected/conflict_stat.out | 142 +++++++++++++ tests/regress/sql/conflict_stat.sql | 91 ++++++++ 11 files changed, 615 insertions(+), 2 deletions(-) create mode 100644 include/spock_conflict_stat.h create mode 100644 src/spock_conflict_stat.c create mode 100644 tests/regress/expected/conflict_stat.out create mode 100644 tests/regress/sql/conflict_stat.sql diff --git a/Makefile b/Makefile index efb1b93c..3584a839 100644 --- a/Makefile +++ b/Makefile @@ -50,8 +50,14 @@ all: spock.control # ----------------------------------------------------------------------------- # Regression tests # ----------------------------------------------------------------------------- +# PG18+ only tests +REGRESS_PG18 = +ifeq ($(shell test $(PGVER) -ge 18 && echo yes),yes) +REGRESS_PG18 = conflict_stat +endif + REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondary_unique \ - excluded_schema \ + excluded_schema $(REGRESS_PG18) \ toasted replication_set matview bidirectional primary_key \ interfaces foreign_key copy sequence triggers parallel functions row_filter \ row_filter_sampling att_list column_filter apply_delay \ diff --git a/include/spock_conflict.h b/include/spock_conflict.h index 8be93fe2..be19a8f2 100644 --- a/include/spock_conflict.h +++ b/include/spock_conflict.h @@ -51,7 +51,7 @@ extern bool spock_save_resolutions; typedef enum { /* The row to be inserted violates unique constraint */ - SPOCK_CT_INSERT_EXISTS, + SPOCK_CT_INSERT_EXISTS = 0, /* The row to be updated was modified by a different origin */ SPOCK_CT_UPDATE_ORIGIN_DIFFERS, @@ -76,6 +76,12 @@ typedef enum } SpockConflictType; +/* + * SPOCK_CT_DELETE_LATE is excluded because it is not yet tracked in conflict + * statistics. + */ +#define SPOCK_CONFLICT_NUM_TYPES (SPOCK_CT_DELETE_MISSING + 1) + extern int spock_conflict_resolver; extern int spock_conflict_log_level; extern bool spock_save_resolutions; diff --git a/include/spock_conflict_stat.h b/include/spock_conflict_stat.h new file mode 100644 index 00000000..492f8446 --- /dev/null +++ b/include/spock_conflict_stat.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * spock_conflict_stat.h + * spock subscription conflict statistics + * + * Copyright (c) 2022-2026, pgEdge, Inc. + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, The Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef SPOCK_CONFLICT_STAT_H +#define SPOCK_CONFLICT_STAT_H + +#include "postgres.h" + +#if PG_VERSION_NUM >= 180000 + +#include "pgstat.h" + +#include "spock_conflict.h" + +/* Shared memory stats entry for spock subscription conflicts */ +typedef struct Spock_Stat_StatSubEntry +{ + PgStat_Counter conflict_count[SPOCK_CONFLICT_NUM_TYPES]; + TimestampTz stat_reset_timestamp; +} Spock_Stat_StatSubEntry; + +/* Pending (backend-local) entry for spock subscription conflicts */ +typedef struct Spock_Stat_PendingSubEntry +{ + PgStat_Counter conflict_count[SPOCK_CONFLICT_NUM_TYPES]; +} Spock_Stat_PendingSubEntry; + +extern void spock_stat_register_conflict_stat(void); + +extern void spock_stat_report_subscription_conflict(Oid subid, + SpockConflictType type); +extern void spock_stat_create_subscription(Oid subid); +extern void spock_stat_drop_subscription(Oid subid); +extern Spock_Stat_StatSubEntry *spock_stat_fetch_stat_subscription(Oid subid); + +#endif /* PG_VERSION_NUM >= 180000 */ + +#endif /* SPOCK_CONFLICT_STAT_H */ \ No newline at end of file diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index 9cf0088d..0e80391d 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -714,6 +714,29 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- ---- +-- Subscription conflict statistics +-- ---- +CREATE FUNCTION spock.get_subscription_stats( + subid oid, + OUT subid oid, + OUT confl_insert_exists bigint, + OUT confl_update_origin_differs bigint, + OUT confl_update_exists bigint, + OUT confl_update_missing bigint, + OUT confl_delete_origin_differs bigint, + OUT confl_delete_missing bigint, + OUT stats_reset timestamptz +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_get_subscription_stats' +LANGUAGE C STABLE; + +CREATE FUNCTION spock.reset_subscription_stats(subid oid DEFAULT NULL) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_reset_subscription_stats' +LANGUAGE C CALLED ON NULL INPUT VOLATILE; + -- Set delta_apply security label on specific column CREATE FUNCTION spock.delta_apply( rel regclass, diff --git a/src/spock.c b/src/spock.c index 2aa74433..f6107a8c 100644 --- a/src/spock.c +++ b/src/spock.c @@ -53,6 +53,9 @@ #include "pgstat.h" #include "spock_apply.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_executor.h" #include "spock_node.h" #include "spock_conflict.h" @@ -1257,4 +1260,9 @@ _PG_init(void) /* Security label provider hook */ register_label_provider(SPOCK_SECLABEL_PROVIDER, spock_object_relabel); + +#if PG_VERSION_NUM >= 180000 + /* Spock replication conflict statistics */ + spock_stat_register_conflict_stat(); +#endif } diff --git a/src/spock_apply_heap.c b/src/spock_apply_heap.c index 84fe935f..dd060f7f 100644 --- a/src/spock_apply_heap.c +++ b/src/spock_apply_heap.c @@ -64,6 +64,7 @@ #include "spock_common.h" #include "spock_conflict.h" +#include "spock_conflict_stat.h" #include "spock_executor.h" #include "spock_node.h" #include "spock_proto_native.h" @@ -1067,6 +1068,17 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, /* SPOCK_CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */ SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index]; +#if PG_VERSION_NUM >= 180000 + if (!MyApplyWorker->use_try_block) + /* + * To avoid duplicated messages complain only in case we are on the + * successful path way. We don't count the conflict if something + * goes wrong already because the update logic is broken yet and + * this conflict may be misleading. + */ + spock_stat_report_subscription_conflict(MyApplyWorker->subid, + SPOCK_CT_UPDATE_MISSING); +#endif /* * The tuple to be updated could not be found. Do nothing except for * emitting a log message. TODO: Add pkey information as well. diff --git a/src/spock_conflict.c b/src/spock_conflict.c index 4f9c7d00..497222ae 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -51,6 +51,9 @@ #include "spock.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_proto_native.h" #include "spock_node.h" #include "spock_worker.h" @@ -424,6 +427,16 @@ spock_report_conflict(SpockConflictType conflict_type, } +#if PG_VERSION_NUM >= 180000 + /* + * TODO: Can't enable until SPOCK_CT_DELETE_LATE is either included in + * SPOCK_CONFLICT_NUM_TYPES or filtered out here — passing it as-is would + * overflow the conflict_count[] array. + * + * spock_stat_report_subscription_conflict(MyApplyWorker->subid, conflict_type); + */ +#endif + if (save_in_resolutions) { /* Count statistics */ @@ -432,6 +445,8 @@ spock_report_conflict(SpockConflictType conflict_type, /* If configured log resolution to table */ spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, + /* If configured log resolution to spock.resolutions table */ + spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, remotetuple, applytuple, resolution, local_tuple_xid, found_local_origin, local_tuple_origin, local_tuple_commit_ts, diff --git a/src/spock_conflict_stat.c b/src/spock_conflict_stat.c new file mode 100644 index 00000000..9f490dd4 --- /dev/null +++ b/src/spock_conflict_stat.c @@ -0,0 +1,255 @@ +/*------------------------------------------------------------------------- + * + * spock_conflict_stat.c + * spock subscription conflict statistics + * + * NOTE: Unlike PostgreSQL subscription statistics, Spock statistics cannot be + * cluster-wide because spock node ID, origin ID, and subscription ID are + * unique only within a database. Therefore, we use MyDatabaseId to identify + * each statistics entry. + * + * Copyright (c) 2022-2026, pgEdge, Inc. + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, The Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#if PG_VERSION_NUM >= 180000 + +#include "funcapi.h" +#include "utils/pgstat_internal.h" + +#include "spock.h" +#include "spock_conflict_stat.h" + +/* + * Kind ID reserved for statistics of spock replication conflicts. + * TODO: ask Michael Paquier about exact numbers and conflict detection + */ +#define SPOCK_PGSTAT_KIND_LRCONFLICTS 27 + +/* Shared memory wrapper for spock subscription conflict stats */ +typedef struct Spock_Stat_Subscription +{ + PgStatShared_Common header; + Spock_Stat_StatSubEntry stats; +} Spock_Stat_Subscription; + +/* + * Column names for spock_get_subscription_stats(), indexed by + * SpockConflictType. Kept in sync with the enum via designated initializers + * so that reordering the enum produces a compile-time error rather than + * silently wrong output. + */ +static const char *const SpockConflictStatColNames[SPOCK_CONFLICT_NUM_TYPES] = { + [SPOCK_CT_INSERT_EXISTS] = "confl_insert_exists", + [SPOCK_CT_UPDATE_ORIGIN_DIFFERS] = "confl_update_origin_differs", + [SPOCK_CT_UPDATE_EXISTS] = "confl_update_exists", + [SPOCK_CT_UPDATE_MISSING] = "confl_update_missing", + [SPOCK_CT_DELETE_ORIGIN_DIFFERS] = "confl_delete_origin_differs", + [SPOCK_CT_DELETE_MISSING] = "confl_delete_missing", +}; + +PG_FUNCTION_INFO_V1(spock_get_subscription_stats); +PG_FUNCTION_INFO_V1(spock_reset_subscription_stats); + +static bool spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, + bool nowait); +static void spock_stat_subscription_reset_timestamp_cb( + PgStatShared_Common *header, + TimestampTz ts); + +/* + * We rely on the pgstat infrastructure here, employing spock's own conflict + * detection algorithm with custom statistics storage. + */ + +static const PgStat_KindInfo spock_conflict_stat = { + .name = "spock_conflict_stat", + .fixed_amount = false, + .write_to_file = true, + + .shared_size = sizeof(Spock_Stat_Subscription), + .shared_data_off = offsetof(Spock_Stat_Subscription, stats), + .shared_data_len = sizeof(((Spock_Stat_Subscription *) 0)->stats), + .pending_size = sizeof(Spock_Stat_PendingSubEntry), + + .flush_pending_cb = spock_stat_subscription_flush_cb, + .reset_timestamp_cb = spock_stat_subscription_reset_timestamp_cb, +}; + +void +spock_stat_register_conflict_stat(void) +{ + pgstat_register_kind(SPOCK_PGSTAT_KIND_LRCONFLICTS, &spock_conflict_stat); +} + +/* + * Report a subscription conflict. + */ +void +spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) +{ + PgStat_EntryRef *entry_ref; + Spock_Stat_PendingSubEntry *pending; + + Assert(type >= 0 && type < SPOCK_CONFLICT_NUM_TYPES); + + entry_ref = pgstat_prep_pending_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid, NULL); + pending = entry_ref->pending; + pending->conflict_count[type]++; +} + +/* + * Report creating the subscription. + */ +void +spock_stat_create_subscription(Oid subid) +{ + /* Ensures that stats are dropped if transaction rolls back */ + pgstat_create_transactional(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid); + + /* Create and initialize the subscription stats entry */ + pgstat_get_entry_ref(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, + true, NULL); + pgstat_reset_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, 0); +} + +/* + * Report dropping the subscription. + * + * Ensures that stats are dropped if transaction commits. + */ +void +spock_stat_drop_subscription(Oid subid) +{ + pgstat_drop_transactional(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid); +} + +/* + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for one subscription or NULL. + */ +Spock_Stat_StatSubEntry * +spock_stat_fetch_stat_subscription(Oid subid) +{ + return (Spock_Stat_StatSubEntry *) + pgstat_fetch_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid); +} + +/* + * Get the subscription statistics for the given subscription. If the + * subscription statistics is not available, return all-zeros stats. + */ +Datum +spock_get_subscription_stats(PG_FUNCTION_ARGS) +{ +#define SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS (1 + SPOCK_CONFLICT_NUM_TYPES + 1) + Oid subid = PG_GETARG_OID(0); + TupleDesc tupdesc; + Datum values[SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; + bool nulls[SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; + Spock_Stat_StatSubEntry *subentry; + Spock_Stat_StatSubEntry allzero; + int i = 0; + AttrNumber attnum = 1; + + /* Get subscription stats */ + subentry = spock_stat_fetch_stat_subscription(subid); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS); + TupleDescInitEntry(tupdesc, attnum++, "subid", + OIDOID, -1, 0); + for (int c = 0; c < SPOCK_CONFLICT_NUM_TYPES; c++) + TupleDescInitEntry(tupdesc, attnum++, SpockConflictStatColNames[c], + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, attnum++, "stats_reset", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); + + if (!subentry) + { + /* If the subscription is not found, initialise its stats */ + memset(&allzero, 0, sizeof(Spock_Stat_StatSubEntry)); + subentry = &allzero; + } + + /* subid */ + values[i++] = ObjectIdGetDatum(subid); + + /* conflict counts */ + for (int nconflict = 0; nconflict < SPOCK_CONFLICT_NUM_TYPES; nconflict++) + values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]); + + /* stats_reset */ + if (subentry->stat_reset_timestamp == 0) + nulls[i] = true; + else + values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp); + + Assert(i + 1 == SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS); + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} +#undef SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS + +/* Reset subscription stats (a specific one or all of them) */ +Datum +spock_reset_subscription_stats(PG_FUNCTION_ARGS) +{ + Oid subid; + + if (PG_ARGISNULL(0)) + { + /* Clear all subscription stats */ + pgstat_reset_of_kind(SPOCK_PGSTAT_KIND_LRCONFLICTS); + } + else + { + subid = PG_GETARG_OID(0); + + if (!OidIsValid(subid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid subscription OID %u", subid))); + pgstat_reset(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid); + } + + PG_RETURN_VOID(); +} + +static bool +spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) +{ + Spock_Stat_PendingSubEntry *localent; + Spock_Stat_Subscription *shsubent; + + localent = (Spock_Stat_PendingSubEntry *) entry_ref->pending; + shsubent = (Spock_Stat_Subscription *) entry_ref->shared_stats; + + /* localent always has non-zero content */ + + if (!pgstat_lock_entry(entry_ref, nowait)) + return false; + + for (int i = 0; i < SPOCK_CONFLICT_NUM_TYPES; i++) + shsubent->stats.conflict_count[i] += localent->conflict_count[i]; + + pgstat_unlock_entry(entry_ref); + return true; +} + +static void +spock_stat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) +{ + ((Spock_Stat_Subscription *) header)->stats.stat_reset_timestamp = ts; +} + +#endif /* PG_VERSION_NUM >= 180000 */ diff --git a/src/spock_functions.c b/src/spock_functions.c index c818233a..a8e0190f 100644 --- a/src/spock_functions.c +++ b/src/spock_functions.c @@ -86,6 +86,9 @@ #include "spock_apply.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_dependency.h" #include "spock_executor.h" #include "spock_node.h" @@ -592,6 +595,9 @@ spock_create_subscription(PG_FUNCTION_ARGS) sub.skip_schema = textarray_to_list(skip_schema_names); create_subscription(&sub); +#if PG_VERSION_NUM >= 180000 + spock_stat_create_subscription(sub.id); +#endif /* Create progress entry to track commit ts per local/remote origin */ spock_group_attach(MyDatabaseId, localnode->node->id, originif.nodeid); @@ -666,6 +672,9 @@ spock_drop_subscription(PG_FUNCTION_ARGS) /* Drop the actual subscription. */ drop_subscription(sub->id); +#if PG_VERSION_NUM >= 180000 + spock_stat_drop_subscription(sub->id); +#endif /* * The rest is different depending on if we are doing this on provider diff --git a/tests/regress/expected/conflict_stat.out b/tests/regress/expected/conflict_stat.out new file mode 100644 index 00000000..2801b17c --- /dev/null +++ b/tests/regress/expected/conflict_stat.out @@ -0,0 +1,142 @@ +-- Test: conflict statistics for UPDATE_MISSING conflicts (PG18+ only) +SELECT * FROM spock_regress_variables() +\gset +\c :provider_dsn +-- Create a simple table for conflict testing +SELECT spock.replicate_ddl($$ + CREATE TABLE public.conflict_stat_test ( + id integer PRIMARY KEY, + data text + ); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT * FROM spock.repset_add_table('default', 'conflict_stat_test'); + repset_add_table +------------------ + t +(1 row) + +-- Seed initial rows +INSERT INTO conflict_stat_test VALUES (1, 'row1'); +INSERT INTO conflict_stat_test VALUES (2, 'row2'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Get subscription OID for stats queries +SELECT sub_id AS test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset +-- Reset conflict stats before the test +SELECT spock.reset_subscription_stats(:test_sub_id); + reset_subscription_stats +-------------------------- + +(1 row) + +-- Verify counters are zero initially +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 0 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Delete a row on subscriber only to set up UPDATE_MISSING +DELETE FROM conflict_stat_test WHERE id = 1; +TRUNCATE spock.exception_log; +\c :provider_dsn +-- Update the row that no longer exists on subscriber +UPDATE conflict_stat_test SET data = 'updated_row1' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Row id=1 should still be missing on subscriber (update was skipped) +SELECT * FROM conflict_stat_test ORDER BY id; + id | data +----+------ + 2 | row2 +(1 row) + +-- The UPDATE_MISSING conflict should be logged in exception_log +SELECT operation, table_name FROM spock.exception_log; + operation | table_name +-----------+-------------------- + UPDATE | conflict_stat_test +(1 row) + +-- Verify that the UPDATE_MISSING conflict was counted +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 1 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Provoke a second UPDATE_MISSING to confirm counter increments +DELETE FROM conflict_stat_test WHERE id = 2; +TRUNCATE spock.exception_log; +\c :provider_dsn +UPDATE conflict_stat_test SET data = 'updated_row2' WHERE id = 2; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +SELECT * FROM conflict_stat_test ORDER BY id; + id | data +----+------ +(0 rows) + +-- Counter should now be 2 +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 2 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Test reset: clear the stats and verify counter goes back to zero +SELECT spock.reset_subscription_stats(:test_sub_id); + reset_subscription_stats +-------------------------- + +(1 row) + +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 0 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Cleanup +TRUNCATE spock.exception_log; +\c :provider_dsn +SELECT spock.replicate_ddl($$ DROP TABLE public.conflict_stat_test CASCADE; $$); +NOTICE: drop cascades to table conflict_stat_test membership in replication set default + replicate_ddl +--------------- + t +(1 row) + diff --git a/tests/regress/sql/conflict_stat.sql b/tests/regress/sql/conflict_stat.sql new file mode 100644 index 00000000..f6d74a3e --- /dev/null +++ b/tests/regress/sql/conflict_stat.sql @@ -0,0 +1,91 @@ +-- Test: conflict statistics for UPDATE_MISSING conflicts (PG18+ only) +SELECT * FROM spock_regress_variables() +\gset + +\c :provider_dsn + +-- Create a simple table for conflict testing +SELECT spock.replicate_ddl($$ + CREATE TABLE public.conflict_stat_test ( + id integer PRIMARY KEY, + data text + ); +$$); + +SELECT * FROM spock.repset_add_table('default', 'conflict_stat_test'); + +-- Seed initial rows +INSERT INTO conflict_stat_test VALUES (1, 'row1'); +INSERT INTO conflict_stat_test VALUES (2, 'row2'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- Get subscription OID for stats queries +SELECT sub_id AS test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset + +-- Reset conflict stats before the test +SELECT spock.reset_subscription_stats(:test_sub_id); + +-- Verify counters are zero initially +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Delete a row on subscriber only to set up UPDATE_MISSING +DELETE FROM conflict_stat_test WHERE id = 1; +TRUNCATE spock.exception_log; + +\c :provider_dsn + +-- Update the row that no longer exists on subscriber +UPDATE conflict_stat_test SET data = 'updated_row1' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- Row id=1 should still be missing on subscriber (update was skipped) +SELECT * FROM conflict_stat_test ORDER BY id; + +-- The UPDATE_MISSING conflict should be logged in exception_log +SELECT operation, table_name FROM spock.exception_log; + +-- Verify that the UPDATE_MISSING conflict was counted +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Provoke a second UPDATE_MISSING to confirm counter increments +DELETE FROM conflict_stat_test WHERE id = 2; +TRUNCATE spock.exception_log; + +\c :provider_dsn + +UPDATE conflict_stat_test SET data = 'updated_row2' WHERE id = 2; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +SELECT * FROM conflict_stat_test ORDER BY id; + +-- Counter should now be 2 +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Test reset: clear the stats and verify counter goes back to zero +SELECT spock.reset_subscription_stats(:test_sub_id); + +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Cleanup +TRUNCATE spock.exception_log; +\c :provider_dsn +SELECT spock.replicate_ddl($$ DROP TABLE public.conflict_stat_test CASCADE; $$); From e1d692c93b7e7420dde39ac80fc66e18bd073d3c Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Fri, 20 Feb 2026 10:36:19 +0100 Subject: [PATCH 2/7] Fix replication_set regression test for deterministic output Truncate the log on the first subscriber visit so subsequent checks are not polluted. Also drop the non-deterministic command_counter column from exception_log queries and order by (table_schema, table_name, remote_commit_ts) with COLLATE "C" instead, making the output stable across runs. --- tests/regress/expected/replication_set.out | 50 +++++++++++----------- tests/regress/sql/replication_set.sql | 12 ++++-- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index 398ea9ac..5d917efb 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -223,6 +223,8 @@ NOTICE: drop cascades to 2 other objects (1 row) \c :subscriber_dsn +-- First time come by to the subscriber node. Clean the history in exception_log +TRUNCATE spock.exception_log; SELECT * FROM spock.replication_set; set_id | set_nodeid | set_name | replicate_insert | replicate_update | replicate_delete | replicate_truncate ------------+------------+---------------------+------------------+------------------+------------------+-------------------- @@ -445,7 +447,7 @@ SELECT * FROM spoc_102l ORDER BY x; -- Check exception log format SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -454,15 +456,15 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ - 1 | | | INSERT | | Spock can't find relation - 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; + table_schema | table_name | operation | remote_new_tup | error_message +--------------+------------+-----------+----------------------------------------------------+------------------------------------------ + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid (6 rows) \c :provider_dsn @@ -562,7 +564,7 @@ SELECT * FROM spoc_102g_u ORDER BY x; (2 rows) SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -571,19 +573,19 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- - 1 | | | INSERT | | Spock can't find relation - 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 7 | | | UPDATE | | Spock can't find relation - 8 | | | UPDATE | | Spock can't find relation - 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; + table_schema | table_name | operation | remote_new_tup | error_message +--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | UPDATE | | Spock can't find relation + | | UPDATE | | Spock can't find relation + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) (10 rows) \c :provider_dsn diff --git a/tests/regress/sql/replication_set.sql b/tests/regress/sql/replication_set.sql index b9132a73..dae09930 100644 --- a/tests/regress/sql/replication_set.sql +++ b/tests/regress/sql/replication_set.sql @@ -98,6 +98,10 @@ SELECT spock.replicate_ddl($$ $$); \c :subscriber_dsn + +-- First time come by to the subscriber node. Clean the history in exception_log +TRUNCATE spock.exception_log; + SELECT * FROM spock.replication_set; -- Issue SPOC-102 @@ -203,7 +207,7 @@ SELECT * FROM spoc_102l ORDER BY x; -- Check exception log format SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -212,7 +216,7 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); @@ -257,7 +261,7 @@ SELECT * FROM spoc_102l_u ORDER BY x; SELECT * FROM spoc_102g_u ORDER BY x; SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -266,7 +270,7 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE'); From e390b37e6eb14b5d3056c798c6f5b19ac8bf4151 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Fri, 20 Feb 2026 10:58:09 +0100 Subject: [PATCH 3/7] Changes after the C-rabbit review. Guard spock_conflict_stat.h include and improve conflict type validation --- .github/workflows/installcheck.yml | 17 +++++++ src/spock_apply_heap.c | 9 ++-- src/spock_conflict_stat.c | 78 +++++++++++++++++++++++++----- 3 files changed, 88 insertions(+), 16 deletions(-) diff --git a/.github/workflows/installcheck.yml b/.github/workflows/installcheck.yml index 2ed6825f..59b40fe1 100644 --- a/.github/workflows/installcheck.yml +++ b/.github/workflows/installcheck.yml @@ -53,6 +53,7 @@ jobs: version: latest - name: Start docker cluster + id: start_cluster run: | cd ${GITHUB_WORKSPACE}/tests/docker/ # To minimize regression tests difference, override pgedge.env with @@ -63,6 +64,22 @@ jobs: PGVER=${{ matrix.pgver }} DBUSER=regression DBNAME=regression \ docker compose up --build --wait -d timeout-minutes: 20 + continue-on-error: true + + - name: Diagnose cluster startup failure + if: steps.start_cluster.outcome == 'failure' + run: | + cd ${GITHUB_WORKSPACE}/tests/docker/ + echo "=== Docker container status ===" + docker compose ps -a + for node in n1 n2 n3; do + echo "" + echo "=== Container logs: $node ===" + docker compose logs pgedge-$node 2>&1 | tail -80 || true + echo "" + echo "=== PostgreSQL logfile: $node ===" + docker compose cp pgedge-$node:/home/pgedge/logfile.log /dev/stdout 2>/dev/null | tail -80 || echo "(not available)" + done - name: Run installcheck on node n1 id: installcheck diff --git a/src/spock_apply_heap.c b/src/spock_apply_heap.c index dd060f7f..0d50dbc4 100644 --- a/src/spock_apply_heap.c +++ b/src/spock_apply_heap.c @@ -64,7 +64,9 @@ #include "spock_common.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 #include "spock_conflict_stat.h" +#endif #include "spock_executor.h" #include "spock_node.h" #include "spock_proto_native.h" @@ -1071,10 +1073,9 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, #if PG_VERSION_NUM >= 180000 if (!MyApplyWorker->use_try_block) /* - * To avoid duplicated messages complain only in case we are on the - * successful path way. We don't count the conflict if something - * goes wrong already because the update logic is broken yet and - * this conflict may be misleading. + * To avoid duplicate messages, only report the conflict on the + * successful pathway. We skip counting when the update logic has + * already failed because the conflict would be misleading. */ spock_stat_report_subscription_conflict(MyApplyWorker->subid, SPOCK_CT_UPDATE_MISSING); diff --git a/src/spock_conflict_stat.c b/src/spock_conflict_stat.c index 9f490dd4..7d5fc311 100644 --- a/src/spock_conflict_stat.c +++ b/src/spock_conflict_stat.c @@ -16,25 +16,29 @@ */ #include "postgres.h" -#if PG_VERSION_NUM >= 180000 - #include "funcapi.h" #include "utils/pgstat_internal.h" #include "spock.h" + +PG_FUNCTION_INFO_V1(spock_get_subscription_stats); +PG_FUNCTION_INFO_V1(spock_reset_subscription_stats); + +#if PG_VERSION_NUM >= 180000 #include "spock_conflict_stat.h" /* * Kind ID reserved for statistics of spock replication conflicts. - * TODO: ask Michael Paquier about exact numbers and conflict detection + * TODO: see https://wiki.postgresql.org/wiki/CustomCumulativeStats to choose + * specific value in production */ -#define SPOCK_PGSTAT_KIND_LRCONFLICTS 27 +#define SPOCK_PGSTAT_KIND_LRCONFLICTS 28 /* Shared memory wrapper for spock subscription conflict stats */ typedef struct Spock_Stat_Subscription { PgStatShared_Common header; - Spock_Stat_StatSubEntry stats; + Spock_Stat_StatSubEntry stats; } Spock_Stat_Subscription; /* @@ -52,9 +56,6 @@ static const char *const SpockConflictStatColNames[SPOCK_CONFLICT_NUM_TYPES] = { [SPOCK_CT_DELETE_MISSING] = "confl_delete_missing", }; -PG_FUNCTION_INFO_V1(spock_get_subscription_stats); -PG_FUNCTION_INFO_V1(spock_reset_subscription_stats); - static bool spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait); static void spock_stat_subscription_reset_timestamp_cb( @@ -95,7 +96,15 @@ spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) PgStat_EntryRef *entry_ref; Spock_Stat_PendingSubEntry *pending; - Assert(type >= 0 && type < SPOCK_CONFLICT_NUM_TYPES); + if (type != SPOCK_CT_UPDATE_MISSING) + /* + * Should happen only in development. Detect it as fast as possible + * with the highest error level that does not crash the instance. + */ + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("unexpected conflict type %d reported for subscription %u", + type, subid))); entry_ref = pgstat_prep_pending_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, NULL); @@ -109,13 +118,31 @@ spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) void spock_stat_create_subscription(Oid subid) { + PgStat_EntryRef *ref; + /* Ensures that stats are dropped if transaction rolls back */ pgstat_create_transactional(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid); /* Create and initialize the subscription stats entry */ - pgstat_get_entry_ref(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, - true, NULL); + ref = pgstat_get_entry_ref(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, + true, NULL); + + if (pg_atomic_read_u32(&ref->shared_entry->refcount) != 2) + /* + * Should never happen: a new subscription stats entry should have + * exactly two references (the hashtable entry and our own). A higher + * count means a stale entry from a previous subscription with the same + * OID was not properly cleaned up. + */ + ereport(WARNING, + (errmsg("conflict statistics entry for subscription %u " + "already has %u references", + subid, + pg_atomic_read_u32(&ref->shared_entry->refcount)), + errhint("This may indicate that a previous subscription with " + "the same OID was not fully dropped."))); + pgstat_reset_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, 0); } @@ -247,9 +274,36 @@ spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) } static void -spock_stat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) +spock_stat_subscription_reset_timestamp_cb(PgStatShared_Common *header, + TimestampTz ts) { ((Spock_Stat_Subscription *) header)->stats.stat_reset_timestamp = ts; } #endif /* PG_VERSION_NUM >= 180000 */ + +#if PG_VERSION_NUM < 180000 + +/* + * XXX: implement conflict statistics gathering, if needed + */ + +Datum +spock_get_subscription_stats(PG_FUNCTION_ARGS) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("spock conflict statistics require PostgreSQL 18 or later"))); + PG_RETURN_NULL(); /* unreachable; suppress compiler warning */ +} + +Datum +spock_reset_subscription_stats(PG_FUNCTION_ARGS) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("spock conflict statistics require PostgreSQL 18 or later"))); + PG_RETURN_NULL(); /* unreachable; suppress compiler warning */ +} + +#endif /* PG_VERSION_NUM < 180000 */ From 267119a19f0f725532ec09ed7a238a7506b4d231 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Wed, 25 Feb 2026 09:52:58 +0100 Subject: [PATCH 4/7] Changes after Ibrar's review --- sql/spock--5.0.6--6.0.0-devel.sql | 23 +++++++++++++++++++++++ src/spock_conflict.c | 10 ++++------ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/sql/spock--5.0.6--6.0.0-devel.sql b/sql/spock--5.0.6--6.0.0-devel.sql index 909d96ae..cfb40761 100644 --- a/sql/spock--5.0.6--6.0.0-devel.sql +++ b/sql/spock--5.0.6--6.0.0-devel.sql @@ -68,6 +68,29 @@ SET conflict_type = CASE conflict_type ELSE conflict_type END; +-- ---- +-- Subscription conflict statistics +-- ---- +CREATE FUNCTION spock.get_subscription_stats( + subid oid, + OUT subid oid, + OUT confl_insert_exists bigint, + OUT confl_update_origin_differs bigint, + OUT confl_update_exists bigint, + OUT confl_update_missing bigint, + OUT confl_delete_origin_differs bigint, + OUT confl_delete_missing bigint, + OUT stats_reset timestamptz +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_get_subscription_stats' +LANGUAGE C STABLE; + +CREATE FUNCTION spock.reset_subscription_stats(subid oid DEFAULT NULL) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_reset_subscription_stats' +LANGUAGE C CALLED ON NULL INPUT VOLATILE; + -- Set delta_apply security label on specific column CREATE FUNCTION spock.delta_apply( rel regclass, diff --git a/src/spock_conflict.c b/src/spock_conflict.c index 497222ae..83e70475 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -445,12 +445,10 @@ spock_report_conflict(SpockConflictType conflict_type, /* If configured log resolution to table */ spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, - /* If configured log resolution to spock.resolutions table */ - spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, - remotetuple, applytuple, resolution, - local_tuple_xid, found_local_origin, - local_tuple_origin, local_tuple_commit_ts, - conflict_idx_oid); + remotetuple, applytuple, resolution, + local_tuple_xid, found_local_origin, + local_tuple_origin, local_tuple_commit_ts, + conflict_idx_oid); } memset(local_tup_ts_str, 0, MAXDATELEN); From 4035682689f2e950320f928e79e16f07fd640fa3 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Mon, 23 Mar 2026 13:43:35 +0100 Subject: [PATCH 5/7] Extend conflict statistics to cover all SpockConflictType values Previously spock_stat_report_subscription_conflict() only accepted SPOCK_CT_UPDATE_MISSING and raised ERROR on any other type. The Spock-specific SPOCK_CT_DELETE_EXISTS (enum value 101) could not be used as a direct array index, which blocked enabling stats reporting from spock_report_conflict(). Introduce spock_conflict_stat_index() to map the non-contiguous enum values to contiguous array indices, bump SPOCK_CONFLICT_NUM_TYPES from 6 to 7, and enable stat reporting for all conflict types including INSERT_EXISTS, UPDATE_ORIGIN_DIFFERS, UPDATE_EXISTS, DELETE_ORIGIN_DIFFERS, DELETE_MISSING, and DELETE_EXISTS. Add confl_delete_exists column to get_subscription_stats() output. --- include/spock_conflict.h | 22 +++++++++++++--- sql/spock--5.0.6--6.0.0-devel.sql | 1 + sql/spock--6.0.0-devel.sql | 1 + src/spock_conflict.c | 8 +----- src/spock_conflict_stat.c | 32 ++++++++++++------------ tests/regress/expected/conflict_stat.out | 32 ++++++++++++------------ tests/regress/sql/conflict_stat.sql | 8 +++--- 7 files changed, 58 insertions(+), 46 deletions(-) diff --git a/include/spock_conflict.h b/include/spock_conflict.h index be19a8f2..994ad8ab 100644 --- a/include/spock_conflict.h +++ b/include/spock_conflict.h @@ -77,10 +77,26 @@ typedef enum } SpockConflictType; /* - * SPOCK_CT_DELETE_LATE is excluded because it is not yet tracked in conflict - * statistics. + * Number of conflict types tracked in statistics. Includes all types in + * SpockConflictType. Because SPOCK_CT_DELETE_EXISTS has a non-contiguous + * enum value (101), use spock_conflict_stat_index() to map enum values to + * contiguous array indices. */ -#define SPOCK_CONFLICT_NUM_TYPES (SPOCK_CT_DELETE_MISSING + 1) +#define SPOCK_CONFLICT_NUM_TYPES 7 + +/* + * Map a SpockConflictType enum value to a contiguous array index + * suitable for conflict_count[] arrays. Returns -1 for unknown types. + */ +static inline int +spock_conflict_stat_index(SpockConflictType type) +{ + if (type <= SPOCK_CT_DELETE_MISSING) + return (int) type; + if (type == SPOCK_CT_DELETE_EXISTS) + return SPOCK_CT_DELETE_MISSING + 1; + return -1; +} extern int spock_conflict_resolver; extern int spock_conflict_log_level; diff --git a/sql/spock--5.0.6--6.0.0-devel.sql b/sql/spock--5.0.6--6.0.0-devel.sql index cfb40761..d64e05b2 100644 --- a/sql/spock--5.0.6--6.0.0-devel.sql +++ b/sql/spock--5.0.6--6.0.0-devel.sql @@ -80,6 +80,7 @@ CREATE FUNCTION spock.get_subscription_stats( OUT confl_update_missing bigint, OUT confl_delete_origin_differs bigint, OUT confl_delete_missing bigint, + OUT confl_delete_exists bigint, OUT stats_reset timestamptz ) RETURNS record diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index 0e80391d..ab396b30 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -726,6 +726,7 @@ CREATE FUNCTION spock.get_subscription_stats( OUT confl_update_missing bigint, OUT confl_delete_origin_differs bigint, OUT confl_delete_missing bigint, + OUT confl_delete_exists bigint, OUT stats_reset timestamptz ) RETURNS record diff --git a/src/spock_conflict.c b/src/spock_conflict.c index 83e70475..46d6e9e3 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -428,13 +428,7 @@ spock_report_conflict(SpockConflictType conflict_type, #if PG_VERSION_NUM >= 180000 - /* - * TODO: Can't enable until SPOCK_CT_DELETE_LATE is either included in - * SPOCK_CONFLICT_NUM_TYPES or filtered out here — passing it as-is would - * overflow the conflict_count[] array. - * - * spock_stat_report_subscription_conflict(MyApplyWorker->subid, conflict_type); - */ + spock_stat_report_subscription_conflict(MyApplyWorker->subid, conflict_type); #endif if (save_in_resolutions) diff --git a/src/spock_conflict_stat.c b/src/spock_conflict_stat.c index 7d5fc311..3b62752f 100644 --- a/src/spock_conflict_stat.c +++ b/src/spock_conflict_stat.c @@ -42,18 +42,19 @@ typedef struct Spock_Stat_Subscription } Spock_Stat_Subscription; /* - * Column names for spock_get_subscription_stats(), indexed by - * SpockConflictType. Kept in sync with the enum via designated initializers - * so that reordering the enum produces a compile-time error rather than - * silently wrong output. + * Column names for spock_get_subscription_stats(), indexed by the contiguous + * stat index (see spock_conflict_stat_index()). Order must match the mapping: + * 0..5 correspond to SPOCK_CT_INSERT_EXISTS..SPOCK_CT_DELETE_MISSING, and + * index 6 corresponds to SPOCK_CT_DELETE_EXISTS. */ static const char *const SpockConflictStatColNames[SPOCK_CONFLICT_NUM_TYPES] = { - [SPOCK_CT_INSERT_EXISTS] = "confl_insert_exists", - [SPOCK_CT_UPDATE_ORIGIN_DIFFERS] = "confl_update_origin_differs", - [SPOCK_CT_UPDATE_EXISTS] = "confl_update_exists", - [SPOCK_CT_UPDATE_MISSING] = "confl_update_missing", - [SPOCK_CT_DELETE_ORIGIN_DIFFERS] = "confl_delete_origin_differs", - [SPOCK_CT_DELETE_MISSING] = "confl_delete_missing", + "confl_insert_exists", + "confl_update_origin_differs", + "confl_update_exists", + "confl_update_missing", + "confl_delete_origin_differs", + "confl_delete_missing", + "confl_delete_exists", }; static bool spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, @@ -95,12 +96,11 @@ spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) { PgStat_EntryRef *entry_ref; Spock_Stat_PendingSubEntry *pending; + int idx; - if (type != SPOCK_CT_UPDATE_MISSING) - /* - * Should happen only in development. Detect it as fast as possible - * with the highest error level that does not crash the instance. - */ + idx = spock_conflict_stat_index(type); + Assert(idx >= 0 && idx < SPOCK_CONFLICT_NUM_TYPES); + if (idx < 0 || idx >= SPOCK_CONFLICT_NUM_TYPES) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("unexpected conflict type %d reported for subscription %u", @@ -109,7 +109,7 @@ spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) entry_ref = pgstat_prep_pending_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, NULL); pending = entry_ref->pending; - pending->conflict_count[type]++; + pending->conflict_count[idx]++; } /* diff --git a/tests/regress/expected/conflict_stat.out b/tests/regress/expected/conflict_stat.out index 2801b17c..898db33c 100644 --- a/tests/regress/expected/conflict_stat.out +++ b/tests/regress/expected/conflict_stat.out @@ -43,11 +43,11 @@ SELECT spock.reset_subscription_stats(:test_sub_id); -- Verify counters are zero initially SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, - confl_delete_origin_differs,confl_delete_missing + confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); - confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing -----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- - 0 | 0 | 0 | 0 | 0 | 0 + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing | confl_delete_exists +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+----------------------+--------------------- + 0 | 0 | 0 | 0 | 0 | 0 | 0 (1 row) -- Delete a row on subscriber only to set up UPDATE_MISSING @@ -80,11 +80,11 @@ SELECT operation, table_name FROM spock.exception_log; -- Verify that the UPDATE_MISSING conflict was counted SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, - confl_delete_origin_differs,confl_delete_missing + confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); - confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing -----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- - 1 | 0 | 0 | 0 | 0 | 0 + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing | confl_delete_exists +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+----------------------+--------------------- + 1 | 0 | 0 | 0 | 0 | 0 | 0 (1 row) -- Provoke a second UPDATE_MISSING to confirm counter increments @@ -107,11 +107,11 @@ SELECT * FROM conflict_stat_test ORDER BY id; -- Counter should now be 2 SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, - confl_delete_origin_differs,confl_delete_missing + confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); - confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing -----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- - 2 | 0 | 0 | 0 | 0 | 0 + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing | confl_delete_exists +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+----------------------+--------------------- + 2 | 0 | 0 | 0 | 0 | 0 | 0 (1 row) -- Test reset: clear the stats and verify counter goes back to zero @@ -123,11 +123,11 @@ SELECT spock.reset_subscription_stats(:test_sub_id); SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, - confl_delete_origin_differs,confl_delete_missing + confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); - confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing -----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- - 0 | 0 | 0 | 0 | 0 | 0 + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing | confl_delete_exists +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+----------------------+--------------------- + 0 | 0 | 0 | 0 | 0 | 0 | 0 (1 row) -- Cleanup diff --git a/tests/regress/sql/conflict_stat.sql b/tests/regress/sql/conflict_stat.sql index f6d74a3e..90da6ef8 100644 --- a/tests/regress/sql/conflict_stat.sql +++ b/tests/regress/sql/conflict_stat.sql @@ -31,7 +31,7 @@ SELECT spock.reset_subscription_stats(:test_sub_id); -- Verify counters are zero initially SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, - confl_delete_origin_differs,confl_delete_missing + confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); -- Delete a row on subscriber only to set up UPDATE_MISSING @@ -55,7 +55,7 @@ SELECT operation, table_name FROM spock.exception_log; -- Verify that the UPDATE_MISSING conflict was counted SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, - confl_delete_origin_differs,confl_delete_missing + confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); -- Provoke a second UPDATE_MISSING to confirm counter increments @@ -74,7 +74,7 @@ SELECT * FROM conflict_stat_test ORDER BY id; -- Counter should now be 2 SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, - confl_delete_origin_differs,confl_delete_missing + confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); -- Test reset: clear the stats and verify counter goes back to zero @@ -82,7 +82,7 @@ SELECT spock.reset_subscription_stats(:test_sub_id); SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, - confl_delete_origin_differs,confl_delete_missing + confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); -- Cleanup From 7a7c5f0ce2487d27a8fceb888a3bdd98f6f098af Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Mon, 23 Mar 2026 14:50:23 +0100 Subject: [PATCH 6/7] Add regression tests for INSERT_EXISTS, DELETE_MISSING, and DELETE_EXISTS Extend conflict_stat.sql with INSERT_EXISTS (insert duplicate key from provider when subscriber already has the row) and DELETE_MISSING (delete on provider when row already gone from subscriber) tests, plus a final verification that all accumulated counters match expectations. Add stat counter checks to tuple_origin.sql which already provokes UPDATE_MISSING, DELETE_MISSING, DELETE_EXISTS, and INSERT_EXISTS conflicts. The checks are gated behind a \if version >= 180000 guard so that PG < 18 runs skip the stat queries gracefully. Provide tuple_origin_1.out as the alternative expected output for PG < 18. --- tests/regress/expected/conflict_stat.out | 80 ++++- tests/regress/expected/tuple_origin.out | 53 +++ tests/regress/expected/tuple_origin_1.out | 404 ++++++++++++++++++++++ tests/regress/sql/conflict_stat.sql | 55 +++ tests/regress/sql/tuple_origin.sql | 34 ++ 5 files changed, 625 insertions(+), 1 deletion(-) create mode 100644 tests/regress/expected/tuple_origin_1.out diff --git a/tests/regress/expected/conflict_stat.out b/tests/regress/expected/conflict_stat.out index 898db33c..9d96cce4 100644 --- a/tests/regress/expected/conflict_stat.out +++ b/tests/regress/expected/conflict_stat.out @@ -121,13 +121,91 @@ SELECT spock.reset_subscription_stats(:test_sub_id); (1 row) +-- ============================================================ +-- Test INSERT_EXISTS: insert a row on subscriber, then insert the same key on +-- provider. The apply worker detects the duplicate and resolves the conflict +-- (last_update_wins converts the insert into an update). +-- ============================================================ +-- Re-seed rows so both sides have data again +\c :provider_dsn +INSERT INTO conflict_stat_test VALUES (10, 'provider10'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Pre-insert a conflicting row on the subscriber +INSERT INTO conflict_stat_test VALUES (20, 'sub-only'); +TRUNCATE spock.exception_log; +\c :provider_dsn +-- This INSERT will conflict with the row already on subscriber +INSERT INTO conflict_stat_test VALUES (20, 'from-provider'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- The row should now reflect the resolved value (remote wins) +SELECT * FROM conflict_stat_test WHERE id = 20; + id | data +----+--------------- + 20 | from-provider +(1 row) + +-- Verify INSERT_EXISTS counter incremented +SELECT confl_insert_exists +FROM spock.get_subscription_stats(:test_sub_id); + confl_insert_exists +--------------------- + 1 +(1 row) + +SELECT spock.reset_subscription_stats(:test_sub_id); + reset_subscription_stats +-------------------------- + +(1 row) + +-- ============================================================ +-- Test DELETE_MISSING: delete a row on subscriber first, then delete the same +-- row on provider. The apply worker cannot find the row and reports +-- DELETE_MISSING. +-- ============================================================ +TRUNCATE spock.exception_log; +-- Remove the row on subscriber before provider sends its DELETE +DELETE FROM conflict_stat_test WHERE id = 10; +\c :provider_dsn +DELETE FROM conflict_stat_test WHERE id = 10; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Row should still be absent +SELECT * FROM conflict_stat_test WHERE id = 10; + id | data +----+------ +(0 rows) + SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing | confl_delete_exists ----------------------+---------------------+-----------------------------+---------------------+-----------------------------+----------------------+--------------------- - 0 | 0 | 0 | 0 | 0 | 0 | 0 + 0 | 0 | 0 | 0 | 0 | 1 | 0 +(1 row) + +SELECT spock.reset_subscription_stats(:test_sub_id); + reset_subscription_stats +-------------------------- + (1 row) -- Cleanup diff --git a/tests/regress/expected/tuple_origin.out b/tests/regress/expected/tuple_origin.out index a1516997..d869f02a 100644 --- a/tests/regress/expected/tuple_origin.out +++ b/tests/regress/expected/tuple_origin.out @@ -1,6 +1,8 @@ --Tuple Origin SELECT * FROM spock_regress_variables() \gset +\c :subscriber_dsn +SELECT current_setting('server_version_num')::int >= 180000 AS has_conflict_stats \gset \c :provider_dsn ALTER SYSTEM SET spock.save_resolutions = on; SELECT pg_reload_conf(); @@ -11,6 +13,17 @@ SELECT pg_reload_conf(); \c :subscriber_dsn TRUNCATE spock.resolutions; +-- Reset conflict stats (PG18+ only) +\if :has_conflict_stats +SELECT sub_id AS origin_test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset +SELECT spock.reset_subscription_stats(:origin_test_sub_id); + reset_subscription_stats +-------------------------- + +(1 row) + +\endif \c :provider_dsn SELECT spock.replicate_ddl($$ CREATE TABLE users (id int PRIMARY KEY, mgr_id int); @@ -82,6 +95,16 @@ SELECT operation, table_name FROM spock.exception_log; UPDATE | users (1 row) +-- Verify UPDATE_MISSING stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_update_missing +FROM spock.get_subscription_stats(:origin_test_sub_id); + confl_update_missing +---------------------- + 1 +(1 row) + +\endif \c :provider_dsn -- This will create a conflict on the subscriber DELETE FROM users where id = 3; @@ -101,6 +124,16 @@ SELECT conflict_type FROM spock.resolutions delete_missing (1 row) +-- Verify DELETE_MISSING stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_delete_missing +FROM spock.get_subscription_stats(:origin_test_sub_id); + confl_delete_missing +---------------------- + 1 +(1 row) + +\endif -- Clear out for next test TRUNCATE spock.resolutions; TRUNCATE spock.exception_log; @@ -154,6 +187,16 @@ SELECT conflict_type, local_tuple FROM spock.resolutions; delete_exists | {"id":3,"mgr_id":333} (1 row) +-- Verify DELETE_EXISTS stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_delete_exists +FROM spock.get_subscription_stats(:origin_test_sub_id); + confl_delete_exists +--------------------- + 1 +(1 row) + +\endif -- Empty SELECT COUNT(1) thecount FROM spock.exception_log; thecount @@ -337,6 +380,16 @@ SELECT conflict_type, conflict_resolution, remote_tuple FROM spock.resolutions; insert_exists | apply_remote | {"id":4,"data":"DD"} (1 row) +-- Verify INSERT_EXISTS stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_insert_exists +FROM spock.get_subscription_stats(:origin_test_sub_id); + confl_insert_exists +--------------------- + 1 +(1 row) + +\endif -- cleanup \c :provider_dsn SELECT * FROM spock.repset_remove_table('default', 'users'); diff --git a/tests/regress/expected/tuple_origin_1.out b/tests/regress/expected/tuple_origin_1.out new file mode 100644 index 00000000..22eb2765 --- /dev/null +++ b/tests/regress/expected/tuple_origin_1.out @@ -0,0 +1,404 @@ +--Tuple Origin +SELECT * FROM spock_regress_variables() +\gset +\c :subscriber_dsn +SELECT current_setting('server_version_num')::int >= 180000 AS has_conflict_stats \gset +\c :provider_dsn +ALTER SYSTEM SET spock.save_resolutions = on; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +\c :subscriber_dsn +TRUNCATE spock.resolutions; +-- Reset conflict stats (PG18+ only) +\if :has_conflict_stats +SELECT sub_id AS origin_test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset +SELECT spock.reset_subscription_stats(:origin_test_sub_id); +\endif +\c :provider_dsn +SELECT spock.replicate_ddl($$ + CREATE TABLE users (id int PRIMARY KEY, mgr_id int); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT * FROM spock.repset_add_table('default', 'users'); + repset_add_table +------------------ + t +(1 row) + +BEGIN; +INSERT INTO USERS SELECT 1, 5; +UPDATE USERS SET id = id + 1 WHERE mgr_id < 10; +UPDATE USERS SET id = id + 1 WHERE mgr_id < 10; +END; +-- Ensure that DDL and updates is confirmed as flushed to the subscriber +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +SELECT * FROM users ORDER BY id; + id | mgr_id +----+-------- + 3 | 5 +(1 row) + +-- Expect 0 rows in spock.resolutions since the origin is the same +SELECT COUNT(*) FROM spock.resolutions + WHERE relname='public.users' + AND local_timestamp = remote_timestamp; + count +------- + 0 +(1 row) + +-- DELETE the row from subscriber first, in order to create a conflict +DELETE FROM users where id = 3; +TRUNCATE spock.resolutions; +TRUNCATE spock.exception_log; +\c :provider_dsn +-- This will create a update_missing conflict on the subscriber, row does not exist +UPDATE users SET mgr_id = 99 WHERE id = 3; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Expect 0 rows in spock.resolutions +SELECT COUNT(*) FROM spock.resolutions; + count +------- + 0 +(1 row) + +-- Expect 1 row in spock.exception_log +SELECT operation, table_name FROM spock.exception_log; + operation | table_name +-----------+------------ + UPDATE | users +(1 row) + +-- Verify UPDATE_MISSING stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_update_missing +FROM spock.get_subscription_stats(:origin_test_sub_id); +\endif +\c :provider_dsn +-- This will create a conflict on the subscriber +DELETE FROM users where id = 3; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Expect 1 row in spock.resolutions with NULL local_timestamp +SELECT conflict_type FROM spock.resolutions + WHERE relname='public.users' + AND local_timestamp IS NULL; + conflict_type +---------------- + delete_missing +(1 row) + +-- Verify DELETE_MISSING stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_delete_missing +FROM spock.get_subscription_stats(:origin_test_sub_id); +\endif +-- Clear out for next test +TRUNCATE spock.resolutions; +TRUNCATE spock.exception_log; +\c :provider_dsn +INSERT INTO users VALUES (3, 33); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +-- Set up test. +-- Delete on n1 with delay, update on n2, wait, query to see conflict +-- Add 2000ms delay to the output plugin +ALTER SYSTEM SET spock.output_delay = 2000; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +-- Let it take effect +SELECT pg_sleep(1); + pg_sleep +---------- + +(1 row) + +DELETE FROM users WHERE id = 3; +\c :subscriber_dsn +-- With the 2 second delay, we should see the row +SELECT * FROM users; + id | mgr_id +----+-------- + 3 | 33 +(1 row) + +UPDATE users SET mgr_id = 333 WHERE id = 3; +-- Wait for delay time to pass with a 1 second buffer +-- so that the delayed DELETE finally arrives (late) +SELECT pg_sleep(3); + pg_sleep +---------- + +(1 row) + +-- We should see one resolution, delete_exists +SELECT conflict_type, local_tuple FROM spock.resolutions; + conflict_type | local_tuple +---------------+----------------------- + delete_exists | {"id":3,"mgr_id":333} +(1 row) + +-- Verify DELETE_EXISTS stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_delete_exists +FROM spock.get_subscription_stats(:origin_test_sub_id); +\endif +-- Empty +SELECT COUNT(1) thecount FROM spock.exception_log; + thecount +---------- + 0 +(1 row) + +SHOW spock.output_delay; + spock.output_delay +-------------------- + 2000 +(1 row) + +-- Reset delay +ALTER SYSTEM SET spock.output_delay = 0; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +-- More tests +\c :provider_dsn +SELECT spock.replicate_ddl($$ + CREATE TABLE basic_conflict ( + id int primary key, + data text); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT * FROM spock.repset_add_table('default', 'basic_conflict'); + repset_add_table +------------------ + t +(1 row) + +INSERT INTO basic_conflict VALUES (1, 'A'), (2, 'B'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +TRUNCATE spock.resolutions; +SELECT * FROM basic_conflict ORDER BY id; + id | data +----+------ + 1 | A + 2 | B +(2 rows) + +\c :provider_dsn +UPDATE basic_conflict SET data = 'AAA' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +SELECT * FROM basic_conflict ORDER BY id; + id | data +----+------ + 1 | AAA + 2 | B +(2 rows) + +--- should return nothing +SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict'; + relname | conflict_type +---------+--------------- +(0 rows) + +-- now update row locally to set up an origin difference +UPDATE basic_conflict SET data = 'sub-A' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :provider_dsn +-- Update on provider again so subscriber will see +-- an origin different from its local one +UPDATE basic_conflict SET data = 'pub-A' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +SELECT * FROM basic_conflict ORDER BY id; + id | data +----+------- + 1 | pub-A + 2 | B +(2 rows) + +-- Origin changes are no longer saved to resolutions +SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict'; + relname | conflict_type +---------+--------------- +(0 rows) + +-- Clean +TRUNCATE spock.resolutions; +-- Test delete_origin_differs: update a row locally on the subscriber to +-- establish a different (local) origin, then delete it from the provider. +UPDATE basic_conflict SET data = 'sub-B' WHERE id = 2; +\c :provider_dsn +DELETE FROM basic_conflict WHERE id = 2; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- The row should be deleted (delete applied regardless of origin difference) +SELECT * FROM basic_conflict ORDER BY id; + id | data +----+------- + 1 | pub-A +(1 row) + +-- delete_origin_differs is not saved to resolutions (same as update_origin_differs) +SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict'; + relname | conflict_type +---------+--------------- +(0 rows) + +TRUNCATE spock.resolutions; +\c :provider_dsn +-- Do update in same transaction as INSERT +BEGIN; +INSERT INTO basic_conflict VALUES (3, 'C'); +UPDATE basic_conflict SET data = 'pub-C' WHERE id = 3; +COMMIT; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +SELECT * FROM basic_conflict ORDER BY id; + id | data +----+------- + 1 | pub-A + 3 | pub-C +(2 rows) + +-- We should not see a conflict +SELECT relname, conflict_type FROM spock.resolutions WHERE relname = 'public.basic_conflict'; + relname | conflict_type +---------+--------------- +(0 rows) + +-- insert_exists check. Add a row to conflict with +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +INSERT INTO basic_conflict VALUES (4, 'D'); +\c :provider_dsn +INSERT INTO basic_conflict VALUES (4, 'DD'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- The insert gets converted into an update, conflict type insert_exists +SELECT conflict_type, conflict_resolution, remote_tuple FROM spock.resolutions; + conflict_type | conflict_resolution | remote_tuple +---------------+---------------------+---------------------- + insert_exists | apply_remote | {"id":4,"data":"DD"} +(1 row) + +-- Verify INSERT_EXISTS stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_insert_exists +FROM spock.get_subscription_stats(:origin_test_sub_id); +\endif +-- cleanup +\c :provider_dsn +SELECT * FROM spock.repset_remove_table('default', 'users'); + repset_remove_table +--------------------- + t +(1 row) + +SELECT * FROM spock.repset_remove_table('default', 'basic_conflict'); + repset_remove_table +--------------------- + t +(1 row) + +SELECT spock.replicate_ddl($$ + DROP TABLE users CASCADE; +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT spock.replicate_ddl($$ + DROP TABLE basic_conflict; +$$); + replicate_ddl +--------------- + t +(1 row) + +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + diff --git a/tests/regress/sql/conflict_stat.sql b/tests/regress/sql/conflict_stat.sql index 90da6ef8..7ffb5ccb 100644 --- a/tests/regress/sql/conflict_stat.sql +++ b/tests/regress/sql/conflict_stat.sql @@ -80,11 +80,66 @@ FROM spock.get_subscription_stats(:test_sub_id); -- Test reset: clear the stats and verify counter goes back to zero SELECT spock.reset_subscription_stats(:test_sub_id); +-- ============================================================ +-- Test INSERT_EXISTS: insert a row on subscriber, then insert the same key on +-- provider. The apply worker detects the duplicate and resolves the conflict +-- (last_update_wins converts the insert into an update). +-- ============================================================ + +-- Re-seed rows so both sides have data again +\c :provider_dsn +INSERT INTO conflict_stat_test VALUES (10, 'provider10'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- Pre-insert a conflicting row on the subscriber +INSERT INTO conflict_stat_test VALUES (20, 'sub-only'); +TRUNCATE spock.exception_log; + +\c :provider_dsn + +-- This INSERT will conflict with the row already on subscriber +INSERT INTO conflict_stat_test VALUES (20, 'from-provider'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- The row should now reflect the resolved value (remote wins) +SELECT * FROM conflict_stat_test WHERE id = 20; + +-- Verify INSERT_EXISTS counter incremented +SELECT confl_insert_exists +FROM spock.get_subscription_stats(:test_sub_id); +SELECT spock.reset_subscription_stats(:test_sub_id); + +-- ============================================================ +-- Test DELETE_MISSING: delete a row on subscriber first, then delete the same +-- row on provider. The apply worker cannot find the row and reports +-- DELETE_MISSING. +-- ============================================================ +TRUNCATE spock.exception_log; + +-- Remove the row on subscriber before provider sends its DELETE +DELETE FROM conflict_stat_test WHERE id = 10; + +\c :provider_dsn + +DELETE FROM conflict_stat_test WHERE id = 10; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- Row should still be absent +SELECT * FROM conflict_stat_test WHERE id = 10; + SELECT confl_update_missing, confl_insert_exists,confl_update_origin_differs,confl_update_exists, confl_delete_origin_differs,confl_delete_missing,confl_delete_exists FROM spock.get_subscription_stats(:test_sub_id); +SELECT spock.reset_subscription_stats(:test_sub_id); + -- Cleanup TRUNCATE spock.exception_log; \c :provider_dsn diff --git a/tests/regress/sql/tuple_origin.sql b/tests/regress/sql/tuple_origin.sql index 0a9cceeb..984f2ae0 100644 --- a/tests/regress/sql/tuple_origin.sql +++ b/tests/regress/sql/tuple_origin.sql @@ -2,6 +2,9 @@ SELECT * FROM spock_regress_variables() \gset +\c :subscriber_dsn +SELECT current_setting('server_version_num')::int >= 180000 AS has_conflict_stats \gset + \c :provider_dsn ALTER SYSTEM SET spock.save_resolutions = on; SELECT pg_reload_conf(); @@ -9,6 +12,13 @@ SELECT pg_reload_conf(); \c :subscriber_dsn TRUNCATE spock.resolutions; +-- Reset conflict stats (PG18+ only) +\if :has_conflict_stats +SELECT sub_id AS origin_test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset +SELECT spock.reset_subscription_stats(:origin_test_sub_id); +\endif + \c :provider_dsn SELECT spock.replicate_ddl($$ CREATE TABLE users (id int PRIMARY KEY, mgr_id int); @@ -47,6 +57,12 @@ SELECT COUNT(*) FROM spock.resolutions; -- Expect 1 row in spock.exception_log SELECT operation, table_name FROM spock.exception_log; +-- Verify UPDATE_MISSING stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_update_missing +FROM spock.get_subscription_stats(:origin_test_sub_id); +\endif + \c :provider_dsn -- This will create a conflict on the subscriber DELETE FROM users where id = 3; @@ -58,6 +74,12 @@ SELECT conflict_type FROM spock.resolutions WHERE relname='public.users' AND local_timestamp IS NULL; +-- Verify DELETE_MISSING stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_delete_missing +FROM spock.get_subscription_stats(:origin_test_sub_id); +\endif + -- Clear out for next test TRUNCATE spock.resolutions; TRUNCATE spock.exception_log; @@ -93,6 +115,12 @@ SELECT pg_sleep(3); -- We should see one resolution, delete_exists SELECT conflict_type, local_tuple FROM spock.resolutions; +-- Verify DELETE_EXISTS stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_delete_exists +FROM spock.get_subscription_stats(:origin_test_sub_id); +\endif + -- Empty SELECT COUNT(1) thecount FROM spock.exception_log; @@ -200,6 +228,12 @@ SELECT spock.wait_slot_confirm_lsn(NULL, NULL); -- The insert gets converted into an update, conflict type insert_exists SELECT conflict_type, conflict_resolution, remote_tuple FROM spock.resolutions; +-- Verify INSERT_EXISTS stat counter (PG18+ only) +\if :has_conflict_stats +SELECT confl_insert_exists +FROM spock.get_subscription_stats(:origin_test_sub_id); +\endif + -- cleanup \c :provider_dsn SELECT * FROM spock.repset_remove_table('default', 'users'); From b7bd52f7b0474a3e2624a4ef13d1a8eac668ef2f Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Tue, 24 Mar 2026 09:49:31 +0100 Subject: [PATCH 7/7] Review fixes. Simplify PG18-only test gating using filter-out in Makefile. Replace the REGRESS_PG18 variable and ifeq block with a filter-out on PG < 18, matching the existing pattern used for add_table. Also remove duplicate extern declarations in spock_conflict.h (already declared earlier in the same file). --- Makefile | 13 ++++++------- include/spock_conflict.h | 4 ---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 3584a839..6ecf34b3 100644 --- a/Makefile +++ b/Makefile @@ -50,14 +50,8 @@ all: spock.control # ----------------------------------------------------------------------------- # Regression tests # ----------------------------------------------------------------------------- -# PG18+ only tests -REGRESS_PG18 = -ifeq ($(shell test $(PGVER) -ge 18 && echo yes),yes) -REGRESS_PG18 = conflict_stat -endif - REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondary_unique \ - excluded_schema $(REGRESS_PG18) \ + excluded_schema conflict_stat \ toasted replication_set matview bidirectional primary_key \ interfaces foreign_key copy sequence triggers parallel functions row_filter \ row_filter_sampling att_list column_filter apply_delay \ @@ -71,6 +65,11 @@ REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondar # cases while developing. REGRESS := $(filter-out add_table, $(REGRESS)) +# Filter out PG18-only tests on older versions +ifneq ($(shell test $(PGVER) -ge 18 && echo yes),yes) +REGRESS := $(filter-out conflict_stat, $(REGRESS)) +endif + # For regression checks # this makes "make check" give a useful error abs_top_builddir = . diff --git a/include/spock_conflict.h b/include/spock_conflict.h index 994ad8ab..ba196579 100644 --- a/include/spock_conflict.h +++ b/include/spock_conflict.h @@ -98,10 +98,6 @@ spock_conflict_stat_index(SpockConflictType type) return -1; } -extern int spock_conflict_resolver; -extern int spock_conflict_log_level; -extern bool spock_save_resolutions; - typedef enum { /* do not log */