diff --git a/Makefile b/Makefile index 6ecf34b3..c0b0f9aa 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ all: spock.control # ----------------------------------------------------------------------------- REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondary_unique \ excluded_schema conflict_stat \ - toasted replication_set matview bidirectional primary_key \ + toasted replication_set exception_row_capture matview bidirectional primary_key \ interfaces foreign_key copy sequence triggers parallel functions row_filter \ row_filter_sampling att_list column_filter apply_delay \ extended node_origin_cascade multiple_upstreams tuple_origin autoddl \ diff --git a/include/spock_exception_handler.h b/include/spock_exception_handler.h index 7997880d..91f65e17 100644 --- a/include/spock_exception_handler.h +++ b/include/spock_exception_handler.h @@ -60,6 +60,7 @@ typedef struct SpockExceptionLog HeapTuple local_tuple; char initial_error_message[1024]; char initial_operation[16]; + uint32 failed_action; /* xact_action_counter at time of error */ } SpockExceptionLog; typedef enum SpockExceptionBehaviour @@ -99,4 +100,5 @@ extern void spock_disable_subscription(SpockSubscription *sub, XLogRecPtr lsn, TimestampTz ts); + #endif /* SPOCK_EXCEPTION_HANDLER_H */ diff --git a/src/spock_apply.c b/src/spock_apply.c index 46d7d96d..19c7fb19 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -435,6 +435,23 @@ begin_replication_step(void) { StartTransactionCommand(); spock_apply_heap_begin(); + + /* + * In TRANSDISCARD/SUB_DISABLE mode, set the transaction + * read-only to prevent any actual DML from being applied. + * Direct catalog writes (exception_log entries) are still + * allowed. + */ + + if (MyApplyWorker->use_try_block && + (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE)) + { + set_config_option("transaction_read_only", "on", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + } + result = true; } @@ -769,8 +786,15 @@ handle_commit(StringInfo s) * be skipped and made it unavailable when re-enabling the * subscription. Skipping such transactions should be an explicit user * action via spock.sub_alter_skiplsn. + * + * For SUB_DISABLE mode during a retry (use_try_block), do not advance + * the LSN even if the replay succeeded. This allows the transaction + * to be re-applied after the user fixes the root cause and re-enables + * the subscription. */ - if (!xact_had_exception || + if ((!xact_had_exception && + !(MyApplyWorker->use_try_block && + exception_behaviour == SUB_DISABLE)) || exception_behaviour == DISCARD || exception_behaviour == TRANSDISCARD) { @@ -792,48 +816,23 @@ handle_commit(StringInfo s) exception_behaviour == SUB_DISABLE)) { SpockExceptionLog *exception_log; - char errmsg[512]; exception_log = &exception_log_ptr[my_exception_log_index]; /* - * All operations were already rolled back in subtransactions (by - * RollbackAndReleaseCurrentSubTransaction in handle_insert/ - * update/delete). Abort the parent transaction to discard it - * entirely. - */ - AbortCurrentTransaction(); - - /* - * Start a new transaction to log the discard and update progress. + * In TRANSDISCARD/SUB_DISABLE mode, DML operations were never + * attempted — they were skipped and logged to exception_log. + * Let the transaction commit normally so those entries are + * preserved. */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - /* - * Log this transaction as discarded to the exception_log so - * there's an audit trail. Include the original error message if - * we have it. - */ - snprintf(errmsg, sizeof(errmsg), - "%s at LSN %X/%X%s%s", - (exception_behaviour == TRANSDISCARD) - ? "Transaction discarded in TRANSDISCARD mode" - : "Transaction failed, subscription will be disabled", - LSN_FORMAT_ARGS(end_lsn), - exception_log->initial_error_message[0] != '\0' ? ". Initial error: " : "", - exception_log->initial_error_message[0] != '\0' ? exception_log->initial_error_message : ""); - - add_entry_to_exception_log(remote_origin_id, - commit_time, - remote_xid, - 0, 0, - NULL, NULL, NULL, NULL, - NULL, NULL, - exception_log->initial_operation, - errmsg); - - elog(LOG, "SPOCK %s: %s", MySubscription->name, errmsg); + elog(LOG, "SPOCK %s: %s at LSN %X/%X%s%s", + MySubscription->name, + (exception_behaviour == TRANSDISCARD) + ? "Transaction discarded in TRANSDISCARD mode" + : "Transaction failed, subscription will be disabled", + LSN_FORMAT_ARGS(end_lsn), + exception_log->initial_error_message[0] != '\0' ? ". Initial error: " : "", + exception_log->initial_error_message[0] != '\0' ? exception_log->initial_error_message : ""); /* * Clear the exception state so we don't enter exception handling @@ -842,31 +841,9 @@ handle_commit(StringInfo s) exception_log->commit_lsn = InvalidXLogRecPtr; exception_log->initial_error_message[0] = '\0'; MySpockWorker->restart_delay = 0; - PopActiveSnapshot(); - CommitTransactionCommand(); - /* - * For SUB_DISABLE mode, throw an error to trigger subscription - * disable in the parent PG_CATCH block. The transaction failure - * is already logged above. - */ - if (exception_behaviour == SUB_DISABLE) - { - elog(ERROR, "SPOCK %s: disabling subscription due to exception in SUB_DISABLE mode", - MySubscription->name); - } - - /* - * Switch to MessageContext before continuing. The progress - * tracking code at transdiscard_skip_commit expects - * MessageContext. - */ - MemoryContextSwitchTo(MessageContext); - - /* - * Skip the normal commit path - jump to progress tracking. - */ - goto transdiscard_skip_commit; + /* Defensive check */ + Assert(XactReadOnly); } /* Have the commit code adjust our logical clock if needed */ @@ -881,24 +858,24 @@ handle_commit(StringInfo s) MemoryContextSwitchTo(TopMemoryContext); - if (xact_had_exception) + if (exception_behaviour == SUB_DISABLE && + (xact_had_exception || MyApplyWorker->use_try_block)) { /* - * If we had exception(s) and are in SUB_DISABLE mode then the - * subscription got disabled earlier in the code path. We need to - * exit here to disconnect. + * SUB_DISABLE: after committing exception_log entries, throw + * an ERROR to trigger subscription disable in the PG_CATCH + * block. This covers both the case where DML actually failed + * (xact_had_exception) and the retry path where all DML was + * skipped but the original error was logged (use_try_block). */ - if (exception_behaviour == SUB_DISABLE) - { - SpockExceptionLog *exception_log; + SpockExceptionLog *exception_log; - exception_log = &exception_log_ptr[my_exception_log_index]; - exception_log->commit_lsn = InvalidXLogRecPtr; - MySpockWorker->restart_delay = 0; + exception_log = &exception_log_ptr[my_exception_log_index]; + exception_log->commit_lsn = InvalidXLogRecPtr; + MySpockWorker->restart_delay = 0; - elog(ERROR, "SPOCK %s: exiting because subscription disabled", - MySubscription->name); - } + elog(ERROR, "SPOCK %s: disabling subscription due to exception in SUB_DISABLE mode", + MySubscription->name); } else if (MyApplyWorker->use_try_block && exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') @@ -945,7 +922,6 @@ handle_commit(StringInfo s) */ maybe_advance_forwarded_origin(end_lsn, xact_had_exception); -transdiscard_skip_commit: /* Update the entry in the progress table. */ elog(DEBUG1, "SPOCK %s: updating progress table for node_id %d" \ " and remote node id %d with remote commit ts" \ @@ -1195,6 +1171,11 @@ log_insert_exception(bool failed, char *errmsg, SpockRelation *rel, errmsg); } +/* + * All the memory operations of this function is covered under the + * ApplyOperationContext's umbrella: in case of an error necessary data is + * copied into more stable memory context in the upper CATCH section. + */ static void handle_insert(StringInfo s) { @@ -1211,10 +1192,10 @@ handle_insert(StringInfo s) if (is_skipping_changes()) return; - oldcontext = MemoryContextSwitchTo(ApplyOperationContext); - started_tx = begin_replication_step(); + oldcontext = MemoryContextSwitchTo(ApplyOperationContext); + rel = spock_read_insert(s, RowExclusiveLock, &newtup); if (unlikely(rel == NULL)) { @@ -1235,9 +1216,9 @@ handle_insert(StringInfo s) log_insert_exception(true, "Spock can't find relation", NULL, NULL, NULL, "INSERT"); - end_replication_step(); MemoryContextSwitchTo(oldcontext); MemoryContextReset(ApplyOperationContext); + end_replication_step(); return; } @@ -1249,9 +1230,9 @@ handle_insert(StringInfo s) if (!should_apply_changes_for_rel(rel->nspname, rel->relname)) { spock_relation_close(rel, NoLock); - end_replication_step(); MemoryContextSwitchTo(oldcontext); MemoryContextReset(ApplyOperationContext); + end_replication_step(); return; } @@ -1271,6 +1252,9 @@ handle_insert(StringInfo s) spock_apply_heap_mi_add_tuple(rel, &newtup); last_insert_rel_cnt++; + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(ApplyOperationContext); + /* * Close replication step to satisfy corresponding 'begin' routine. * TODO: multi-insert code should be revised one day: it is not @@ -1280,9 +1264,6 @@ handle_insert(StringInfo s) * this tuple and what's then? */ end_replication_step(); - - MemoryContextSwitchTo(oldcontext); - MemoryContextReset(ApplyOperationContext); return; } } @@ -1308,50 +1289,83 @@ handle_insert(StringInfo s) /* TODO: Handle multiple inserts */ if (MyApplyWorker->use_try_block) { - PG_TRY(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { + /* + * TRANSDISCARD and SUB_DISABLE: skip the DML and log the + * discarded operation directly to spock.exception_log. The + * transaction is read-only, but exception_log has + * user_catalog_table=true so CatalogTupleInsert works. + * + * Only the record that originally caused the error gets the + * real error message; other records get NULL. + */ + char *error_msg = + (xact_action_counter == + exception_log_ptr[my_exception_log_index].failed_action && + exception_log_ptr[my_exception_log_index].initial_error_message[0]) ? + exception_log_ptr[my_exception_log_index].initial_error_message : + NULL; + exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_insert(rel, &newtup); + exception_log_ptr[my_exception_log_index].local_tuple = NULL; + log_insert_exception(false, error_msg, rel, NULL, &newtup, "INSERT"); } - PG_CATCH(); + else { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + /* DISCARD mode: try block with subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_insert(rel, &newtup); + ReleaseCurrentSubTransaction(); + } + PG_CATCH(); + { + /* Set per-operation error flag */ + failed = true; + /* Set transaction-wide error flag */ + xact_had_exception = true; - if (!failed) - { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + MemoryContextSwitchTo(ApplyOperationContext); + edata = CopyErrorData(); + + FlushErrorState(); RollbackAndReleaseCurrentSubTransaction(); - else - ReleaseCurrentSubTransaction(); - } + } + PG_END_TRY(); - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ - { - char *error_msg = edata ? edata->message : - (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? - exception_log_ptr[my_exception_log_index].initial_error_message : NULL); + /* + * Rollback switches to the parent transaction context; + * restore ApplyOperationContext for the code below. + */ + MemoryContextSwitchTo(ApplyOperationContext); - log_insert_exception(failed, error_msg, rel, NULL, &newtup, "INSERT"); + if (failed) + { + /* + * Need to keep this database operation out of the CATCH section + * to avoid FATAL error in case if an ERROR happens there. + */ + log_insert_exception(true, edata->message, rel, + NULL, &newtup, "INSERT"); + } } } else { - MemoryContextSwitchTo(ApplyOperationContext); spock_apply_heap_insert(rel, &newtup); - MemoryContextSwitchTo(oldcontext); } + /* + * DML operation is finished. Be paranoid and check memory context before + * switching out and cleaning the per-operation memory context + */ + Assert(CurrentMemoryContext == ApplyOperationContext); + MemoryContextSwitchTo(oldcontext); + /* if INSERT was into our queue, process the message. */ if (RelationGetRelid(rel->rel) == QueueRelid) { @@ -1361,8 +1375,6 @@ handle_insert(StringInfo s) multi_insert_finish(); - MemoryContextSwitchTo(ApplyOperationContext); - ht = heap_form_tuple(RelationGetDescr(rel->rel), newtup.values, newtup.nulls); @@ -1384,14 +1396,13 @@ handle_insert(StringInfo s) table_close(qrel, NoLock); spock_apply_heap_begin(); - MemoryContextSwitchTo(MessageContext); } else { spock_relation_close(rel, NoLock); end_replication_step(); } - MemoryContextSwitchTo(MessageContext); + MemoryContextReset(ApplyOperationContext); } @@ -1477,42 +1488,62 @@ handle_update(StringInfo s) if (MyApplyWorker->use_try_block == true) { - PG_TRY(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { + /* + * TRANSDISCARD and SUB_DISABLE: skip the DML and log the + * discarded operation directly to spock.exception_log. + */ + char *error_msg = + (xact_action_counter == + exception_log_ptr[my_exception_log_index].failed_action && + exception_log_ptr[my_exception_log_index].initial_error_message[0]) ? + exception_log_ptr[my_exception_log_index].initial_error_message : + NULL; + exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_update(rel, hasoldtup ? &oldtup : &newtup, &newtup); + exception_log_ptr[my_exception_log_index].local_tuple = NULL; + log_insert_exception(false, error_msg, rel, + hasoldtup ? &oldtup : NULL, &newtup, "UPDATE"); } - PG_CATCH(); + else { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + /* DISCARD mode: try block with subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_update(rel, hasoldtup ? &oldtup : &newtup, &newtup); + } + PG_CATCH(); + { + failed = true; + xact_had_exception = true; - if (!failed) - { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + MemoryContextSwitchTo(ApplyOperationContext); + edata = CopyErrorData(); + + FlushErrorState(); RollbackAndReleaseCurrentSubTransaction(); - else - ReleaseCurrentSubTransaction(); - } - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ - { - char *error_msg = edata ? edata->message : - (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? - exception_log_ptr[my_exception_log_index].initial_error_message : NULL); + /* + * Rollback switches to the parent transaction context; + * restore ApplyOperationContext for the code below. + */ + MemoryContextSwitchTo(ApplyOperationContext); + } + PG_END_TRY(); - log_insert_exception(failed, error_msg, rel, - hasoldtup ? &oldtup : NULL, &newtup, "UPDATE"); + if (!failed) + ReleaseCurrentSubTransaction(); + + if (failed) + { + log_insert_exception(true, edata->message, rel, + hasoldtup ? &oldtup : NULL, &newtup, + "UPDATE"); + } } } else @@ -1523,6 +1554,9 @@ handle_update(StringInfo s) spock_relation_close(rel, NoLock); end_replication_step(); + + /* Free CopyErrorData allocations from the DISCARD-mode PG_CATCH path. */ + MemoryContextReset(ApplyOperationContext); } static void @@ -1584,42 +1618,61 @@ handle_delete(StringInfo s) if (MyApplyWorker->use_try_block) { - PG_TRY(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { + /* + * TRANSDISCARD and SUB_DISABLE: skip the DML and log the + * discarded operation directly to spock.exception_log. + */ + char *error_msg = + (xact_action_counter == + exception_log_ptr[my_exception_log_index].failed_action && + exception_log_ptr[my_exception_log_index].initial_error_message[0]) ? + exception_log_ptr[my_exception_log_index].initial_error_message : + NULL; + exception_command_counter++; - BeginInternalSubTransaction(NULL); - spock_apply_heap_delete(rel, &oldtup); + exception_log_ptr[my_exception_log_index].local_tuple = NULL; + log_insert_exception(false, error_msg, rel, + &oldtup, NULL, "DELETE"); } - PG_CATCH(); + else { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + /* DISCARD mode: try block with subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + spock_apply_heap_delete(rel, &oldtup); + } + PG_CATCH(); + { + failed = true; + xact_had_exception = true; - if (!failed) - { - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + MemoryContextSwitchTo(ApplyOperationContext); + edata = CopyErrorData(); + + FlushErrorState(); RollbackAndReleaseCurrentSubTransaction(); - else - ReleaseCurrentSubTransaction(); - } - /* - * Log the exception. If this operation succeeded but we have an - * initial error message (from a previous attempt), use that instead - * of NULL to provide context for why we're logging this. - */ - { - char *error_msg = edata ? edata->message : - (exception_log_ptr[my_exception_log_index].initial_error_message[0] ? - exception_log_ptr[my_exception_log_index].initial_error_message : NULL); + /* + * Rollback switches to the parent transaction context; + * restore ApplyOperationContext for the code below. + */ + MemoryContextSwitchTo(ApplyOperationContext); + } + PG_END_TRY(); - log_insert_exception(failed, error_msg, rel, - &oldtup, NULL, "DELETE"); + if (!failed) + ReleaseCurrentSubTransaction(); + + if (failed) + { + log_insert_exception(true, edata->message, rel, + &oldtup, NULL, "DELETE"); + } } } else @@ -1630,6 +1683,9 @@ handle_delete(StringInfo s) spock_relation_close(rel, NoLock); end_replication_step(); + + /* Free CopyErrorData allocations from the DISCARD-mode PG_CATCH path. */ + MemoryContextReset(ApplyOperationContext); } /* @@ -1658,8 +1714,33 @@ handle_truncate(StringInfo s) begin_replication_step(); errcallback_arg.action_name = "TRUNCATE"; + xact_action_counter++; remote_relids = spock_read_truncate(s, &cascade, &restart_seqs); + /* + * TRANSDISCARD/SUB_DISABLE: skip the TRUNCATE and log it as discarded. + * ExecuteTruncateGuts is called directly (not via ProcessUtility), so + * transaction_read_only does not protect against it — we must skip + * explicitly, matching the pattern in handle_insert/update/delete. + */ + if (MyApplyWorker->use_try_block && + (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE)) + { + char *error_msg = + (xact_action_counter == + exception_log_ptr[my_exception_log_index].failed_action && + exception_log_ptr[my_exception_log_index].initial_error_message[0]) ? + exception_log_ptr[my_exception_log_index].initial_error_message : + NULL; + + exception_command_counter++; + exception_log_ptr[my_exception_log_index].local_tuple = NULL; + log_insert_exception(false, error_msg, NULL, NULL, NULL, "TRUNCATE"); + end_replication_step(); + return; + } + foreach(lc, remote_relids) { SpockRelation *rel; @@ -2251,46 +2332,69 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started) if (MyApplyWorker->use_try_block) { - PG_TRY(); + if (exception_behaviour == TRANSDISCARD || + exception_behaviour == SUB_DISABLE) { + /* + * TRANSDISCARD and SUB_DISABLE: skip the DDL, just extract SQL + * for logging below. JsonbToCString is simpler than the full + * JSONB iterator extraction used in handle_sql(); the result + * includes JSON quoting but that's acceptable for a log entry + * (handle_sql() needs the raw string for execution). + */ + sql = JsonbToCString(NULL, + &queued_message->message->root, 0); exception_command_counter++; - BeginInternalSubTransaction(NULL); - handle_sql(queued_message, tx_just_started, &sql); + failed = false; } - PG_CATCH(); + else { - failed = true; - RollbackAndReleaseCurrentSubTransaction(); - edata = CopyErrorData(); - xact_had_exception = true; - } - PG_END_TRY(); + /* DISCARD mode: try block with subtransactions */ + PG_TRY(); + { + exception_command_counter++; + BeginInternalSubTransaction(NULL); + handle_sql(queued_message, tx_just_started, &sql); + } + PG_CATCH(); + { + failed = true; + xact_had_exception = true; - if (!failed) - { - /* - * Follow spock.exception_behavior GUC instead of restarting - * worker - */ - if (exception_behaviour == TRANSDISCARD || - exception_behaviour == SUB_DISABLE) + MemoryContextSwitchTo(ApplyOperationContext); + edata = CopyErrorData(); + + FlushErrorState(); RollbackAndReleaseCurrentSubTransaction(); - else + + /* + * Rollback switches to the parent transaction context; + * restore ApplyOperationContext for the code below. + */ + MemoryContextSwitchTo(ApplyOperationContext); + } + PG_END_TRY(); + + if (!failed) ReleaseCurrentSubTransaction(); } - /* Let's create an exception log entry if true. */ if (should_log_exception(failed)) { - /* - * Use current error message if operation failed, otherwise use - * initial_error_message for context (e.g., in DISCARD mode when - * SQL succeeds but we're logging it because of a previous error). - */ - char *error_msg = failed ? edata->message : - (my_exception_log_index >= 0 && - exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0' ? - exception_log_ptr[my_exception_log_index].initial_error_message : NULL); + char *error_msg; + + /* Just to be paranoid */ + Assert(my_exception_log_index >= 0); + + if (failed) + error_msg = edata->message; + else + error_msg = + (xact_action_counter == + exception_log_ptr[my_exception_log_index].failed_action && + exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ? + exception_log_ptr[my_exception_log_index].initial_error_message : + NULL; add_entry_to_exception_log(remote_origin_id, replorigin_session_origin_timestamp, @@ -2308,6 +2412,9 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started) } end_replication_step(); + + /* Free CopyErrorData allocations from the DISCARD-mode PG_CATCH path. */ + MemoryContextReset(ApplyOperationContext); } /* @@ -3223,6 +3330,14 @@ apply_work(PGconn *streamConn) sizeof(exception_log_ptr[my_exception_log_index].initial_operation), "%s", errcallback_arg.action_name ? errcallback_arg.action_name : "UNKNOWN"); + + /* + * Remember which action in the transaction triggered the error. + * During the read-only replay, only this action gets the real + * error message; other records get NULL. + */ + exception_log_ptr[my_exception_log_index].failed_action = + xact_action_counter; } FlushErrorState(); @@ -3251,7 +3366,6 @@ apply_work(PGconn *streamConn) if (need_replay) { MyApplyWorker->use_try_block = true; - goto stream_replay; } diff --git a/src/spock_exception_handler.c b/src/spock_exception_handler.c index 09cd2cd0..811bf469 100644 --- a/src/spock_exception_handler.c +++ b/src/spock_exception_handler.c @@ -188,12 +188,8 @@ add_entry_to_exception_log(Oid remote_origin, TimestampTz remote_commit_ts, values[Anum_exception_log_ddl_user - 1] = CStringGetTextDatum(ddl_user); } - /* - * The error_message column of the spock.exception_log table is marked as - * NOT NULL, but we don't always have a valid error message. - */ if (error_message == NULL) - values[Anum_exception_log_error_message - 1] = CStringGetTextDatum("unknown"); + values[Anum_exception_log_error_message - 1] = CStringGetTextDatum("unavailable"); else values[Anum_exception_log_error_message - 1] = CStringGetTextDatum(error_message); values[Anum_exception_log_retry_errored_at - 1] = TimestampTzGetDatum(GetCurrentTimestamp()); diff --git a/tests/regress/expected/exception_row_capture.out b/tests/regress/expected/exception_row_capture.out new file mode 100644 index 00000000..9d9c016b --- /dev/null +++ b/tests/regress/expected/exception_row_capture.out @@ -0,0 +1,534 @@ +-- +-- Test: Exception behaviour modes (DISCARD, TRANSDISCARD, SUB_DISABLE) +-- +-- Common scenario: three tables on provider, one broken on subscriber. +-- A single transaction with DMLs on all three tables triggers an error +-- on the broken table. The first DML (INSERT into drl_t1) also creates +-- a conflict (INSERT_EXISTS) to verify that exception row capture modes +-- do not log it to spock.resolutions, while DISCARD mode does. +-- +-- Each mode uses a different breakage method: +-- TRANSDISCARD: absent table (DROP TABLE on subscriber) +-- DISCARD: truncated table (TRUNCATE on subscriber, row missing) +-- SUB_DISABLE: deleted row (DELETE on subscriber, row missing) +-- +SELECT * FROM spock_regress_variables() +\gset +-- ============================================================ +-- Setup: create the three tables on the provider, enable +-- resolution logging so we can verify resolutions behavior. +-- ============================================================ +\c :provider_dsn +ALTER SYSTEM SET spock.save_resolutions = on; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT spock.replicate_ddl($$ + CREATE TABLE public.drl_t1 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t2 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t3 (x integer PRIMARY KEY); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t1'); + repset_add_table +------------------ + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t2'); + repset_add_table +------------------ + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t3'); + repset_add_table +------------------ + t +(1 row) + +INSERT INTO drl_t1 VALUES (0); +INSERT INTO drl_t2 VALUES (0); +INSERT INTO drl_t3 VALUES (0); +SELECT spock.sync_event() AS sync_lsn \gset +-- Verify initial data arrived +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + result +-------- + t +(1 row) + +SELECT * FROM drl_t1; + x +--- + 0 +(1 row) + +SELECT * FROM drl_t2; + x +--- + 0 +(1 row) + +SELECT * FROM drl_t3; + x +--- + 0 +(1 row) + +-- ============================================================ +-- TRANSDISCARD mode (error: absent table) +-- +-- drl_t1: pre-insert row x=1 on subscriber to set up INSERT_EXISTS +-- drl_t2: DROP TABLE to provoke "can't find relation" error +-- ============================================================ +\c :subscriber_dsn +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +ALTER SYSTEM SET spock.exception_behaviour = 'transdiscard'; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +-- Set up INSERT_EXISTS conflict on drl_t1 +INSERT INTO drl_t1 VALUES (1); +-- Drop table_2 on subscriber to provoke error +DROP TABLE drl_t2; +\c :provider_dsn +BEGIN; +INSERT INTO drl_t1 VALUES (1); -- conflict: INSERT_EXISTS +UPDATE drl_t2 SET x = 1 WHERE x = 0; -- error: missing relation +UPDATE drl_t3 SET x = 1 WHERE x = 0; -- ok +END; +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + result +-------- + t +(1 row) + +-- None of the DMLs should have been applied (entire TX discarded) +SELECT * FROM drl_t1 ORDER BY x; + x +--- + 0 + 1 +(2 rows) + +SELECT * FROM drl_t3; + x +--- + 0 +(1 row) + +-- Three records in exception_log; only drl_t2 has a non-NULL error_message. +SELECT table_name, operation, (error_message <> '') AS has_error +FROM spock.exception_log +ORDER BY command_counter; + table_name | operation | has_error +------------+-----------+----------- + drl_t1 | INSERT | t + | UPDATE | t + drl_t3 | UPDATE | t +(3 rows) + +-- Resolutions must be empty: exception row capture never executes DML, +-- so no conflict detection happens. +SELECT COUNT(*) AS resolutions_count FROM spock.resolutions; + resolutions_count +------------------- + 0 +(1 row) + +-- ============================================================ +-- TRANSDISCARD with DDL in the transaction +-- +-- Verify that a queued DDL operation inside a failing transaction +-- produces exactly one exception_log entry, not a duplicate. +-- ============================================================ +\c :subscriber_dsn +TRUNCATE spock.exception_log; +TRUNCATE drl_t3; +\c :provider_dsn +BEGIN; +UPDATE drl_t1 SET x = 2 WHERE x = 1; +SELECT spock.replicate_ddl('CREATE TABLE IF NOT EXISTS public.drl_dummy (x int)'); + replicate_ddl +--------------- + t +(1 row) + +UPDATE drl_t3 SET x = 2 WHERE x = 1; -- error: row missing after TRUNCATE +END; +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + result +-------- + t +(1 row) + +-- Expect no duplicate DDL entries: one record per operation. +SELECT table_name, operation, (error_message <> '') AS has_error, ddl_statement +FROM spock.exception_log +ORDER BY command_counter; + table_name | operation | has_error | ddl_statement +------------+-----------+-----------+-------------------------------------------------------------------------------------------- + drl_t1 | UPDATE | t | + queue | INSERT | t | + | DDL | t | "SET search_path TO \"$user\",public; CREATE TABLE IF NOT EXISTS public.drl_dummy (x int)" + drl_t3 | UPDATE | t | +(4 rows) + +-- Check data: +SELECT x FROM drl_t1 WHERE x = 2; -- Record has not been updated + x +--- +(0 rows) + +SELECT * FROM drl_dummy; -- ERROR, table doesn't exist +ERROR: relation "drl_dummy" does not exist +LINE 1: SELECT * FROM drl_dummy; + ^ +-- Cleanup the dummy table +\c :provider_dsn +SELECT spock.replicate_ddl('DROP TABLE IF EXISTS public.drl_dummy'); + replicate_ddl +--------------- + t +(1 row) + +-- ============================================================ +-- Reset for next test +-- ============================================================ +\c :provider_dsn +SELECT spock.replicate_ddl($$ + DROP TABLE IF EXISTS public.drl_t1 CASCADE; + DROP TABLE IF EXISTS public.drl_t2 CASCADE; + DROP TABLE IF EXISTS public.drl_t3 CASCADE; + CREATE TABLE public.drl_t1 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t2 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t3 (x integer PRIMARY KEY); +$$); +NOTICE: drop cascades to table drl_t1 membership in replication set default +NOTICE: drop cascades to table drl_t2 membership in replication set default +NOTICE: drop cascades to table drl_t3 membership in replication set default + replicate_ddl +--------------- + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t1'); + repset_add_table +------------------ + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t2'); + repset_add_table +------------------ + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t3'); + repset_add_table +------------------ + t +(1 row) + +INSERT INTO drl_t1 VALUES (0); +INSERT INTO drl_t2 VALUES (0); +INSERT INTO drl_t3 VALUES (0); +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + result +-------- + t +(1 row) + +SELECT * FROM drl_t1; + x +--- + 0 +(1 row) + +SELECT * FROM drl_t2; + x +--- + 0 +(1 row) + +SELECT * FROM drl_t3; + x +--- + 0 +(1 row) + +-- ============================================================ +-- DISCARD mode (error: truncated table, row missing) +-- +-- drl_t1: pre-insert row x=1 to set up INSERT_EXISTS conflict +-- drl_t2: TRUNCATE so the UPDATE can't find the row +-- ============================================================ +\c :subscriber_dsn +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +ALTER SYSTEM SET spock.exception_behaviour = 'discard'; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +-- Set up INSERT_EXISTS conflict on drl_t1 +INSERT INTO drl_t1 VALUES (1); +-- Truncate table_2 on subscriber so the UPDATE can't find the row +TRUNCATE drl_t2; +\c :provider_dsn +BEGIN; +INSERT INTO drl_t1 VALUES (1); -- conflict: INSERT_EXISTS (resolved) +UPDATE drl_t2 SET x = 1 WHERE x = 0; -- error: row missing after TRUNCATE +UPDATE drl_t3 SET x = 1 WHERE x = 0; -- ok +END; +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + result +-------- + t +(1 row) + +-- In DISCARD mode: drl_t1 INSERT conflict resolved, drl_t2 failed, +-- drl_t3 applied +SELECT * FROM drl_t1 ORDER BY x; + x +--- + 0 + 1 +(2 rows) + +SELECT * FROM drl_t2; + x +--- +(0 rows) + +SELECT * FROM drl_t3; + x +--- + 1 +(1 row) + +-- The failed DML (drl_t2) should appear in exception_log +SELECT table_name, operation, (error_message <> '') AS has_error +FROM spock.exception_log +WHERE table_name IS NOT NULL +ORDER BY command_counter; + table_name | operation | has_error +------------+-----------+----------- + drl_t2 | UPDATE | t +(1 row) + +-- Resolutions should contain the INSERT_EXISTS conflict for drl_t1 +SELECT relname, conflict_type FROM spock.resolutions +WHERE relname = 'public.drl_t1'; + relname | conflict_type +---------------+--------------- + public.drl_t1 | insert_exists +(1 row) + +-- ============================================================ +-- Reset for next test +-- ============================================================ +\c :provider_dsn +SELECT spock.replicate_ddl($$ + DROP TABLE IF EXISTS public.drl_t1 CASCADE; + DROP TABLE IF EXISTS public.drl_t2 CASCADE; + DROP TABLE IF EXISTS public.drl_t3 CASCADE; + CREATE TABLE public.drl_t1 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t2 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t3 (x integer PRIMARY KEY); +$$); +NOTICE: drop cascades to table drl_t1 membership in replication set default +NOTICE: drop cascades to table drl_t2 membership in replication set default +NOTICE: drop cascades to table drl_t3 membership in replication set default + replicate_ddl +--------------- + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t1'); + repset_add_table +------------------ + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t2'); + repset_add_table +------------------ + t +(1 row) + +SELECT spock.repset_add_table('default', 'drl_t3'); + repset_add_table +------------------ + t +(1 row) + +INSERT INTO drl_t1 VALUES (0); +INSERT INTO drl_t2 VALUES (0); +INSERT INTO drl_t3 VALUES (0); +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + result +-------- + t +(1 row) + +SELECT * FROM drl_t1; + x +--- + 0 +(1 row) + +SELECT * FROM drl_t2; + x +--- + 0 +(1 row) + +SELECT * FROM drl_t3; + x +--- + 0 +(1 row) + +-- ============================================================ +-- SUB_DISABLE mode (error: deleted row) +-- +-- drl_t1: pre-insert row x=1 to set up INSERT_EXISTS conflict +-- drl_t2: DELETE the row so the UPDATE can't find it +-- ============================================================ +\c :subscriber_dsn +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +ALTER SYSTEM SET spock.exception_behaviour = 'sub_disable'; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +-- Set up INSERT_EXISTS conflict on drl_t1 +INSERT INTO drl_t1 VALUES (1); +-- Delete the row from table_2 on subscriber so the UPDATE can't find it +DELETE FROM drl_t2 WHERE x = 0; +\c :provider_dsn +BEGIN; +INSERT INTO drl_t1 VALUES (1); -- conflict: INSERT_EXISTS +UPDATE drl_t2 SET x = 1 WHERE x = 0; -- error: row missing after DELETE +UPDATE drl_t3 SET x = 1 WHERE x = 0; -- ok +END; +-- Fetch the xid of the last UPDATE so we can skip it later +SELECT fetch_last_xid('U') AS remote_xid \gset +\c :subscriber_dsn +-- Subscription should be disabled now +SELECT sub_enabled FROM spock.subscription + WHERE sub_name = 'test_subscription'; + sub_enabled +------------- + f +(1 row) + +-- Three DML records plus one SUB_DISABLE record in exception_log. +SELECT table_name, operation, (error_message <> '') AS has_error +FROM spock.exception_log +ORDER BY command_counter; + table_name | operation | has_error +------------+-------------+----------- + drl_t1 | INSERT | t + drl_t2 | UPDATE | t + drl_t3 | UPDATE | t + | SUB_DISABLE | t +(4 rows) + +-- None of the DMLs should have been applied +SELECT * FROM drl_t1 ORDER BY x; + x +--- + 0 + 1 +(2 rows) + +SELECT * FROM drl_t3; + x +--- + 0 +(1 row) + +-- Resolutions must be empty: exception row capture never executes DML +SELECT COUNT(*) AS resolutions_count FROM spock.resolutions; + resolutions_count +------------------- + 0 +(1 row) + +-- Re-enable subscription for cleanup +SELECT skiplsn_and_enable_sub('test_subscription', :remote_xid); + skiplsn_and_enable_sub +------------------------ + +(1 row) + +\c :provider_dsn +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + result +-------- + t +(1 row) + +-- ============================================================ +-- Cleanup +-- ============================================================ +ALTER SYSTEM RESET spock.exception_behaviour; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +\c :provider_dsn +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT spock.replicate_ddl($$ + DROP TABLE IF EXISTS public.drl_t1 CASCADE; + DROP TABLE IF EXISTS public.drl_t2 CASCADE; + DROP TABLE IF EXISTS public.drl_t3 CASCADE; +$$); +NOTICE: drop cascades to table drl_t1 membership in replication set default +NOTICE: drop cascades to table drl_t2 membership in replication set default +NOTICE: drop cascades to table drl_t3 membership in replication set default + replicate_ddl +--------------- + t +(1 row) + diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index 5d917efb..985ab0cd 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -1,7 +1,11 @@ /* First test whether a table's replication set can be properly manipulated */ SELECT * FROM spock_regress_variables() \gset +-- Cleanup in advance to make the test more stable +\c :subscriber_dsn +TRUNCATE spock.exception_log; \c :provider_dsn +TRUNCATE spock.exception_log; SELECT spock.replicate_ddl($$ CREATE SCHEMA normalschema; CREATE SCHEMA "strange.schema-IS"; @@ -457,15 +461,27 @@ SELECT ) AS error_message FROM spock.exception_log ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; - table_schema | table_name | operation | remote_new_tup | error_message ---------------+------------+-----------+----------------------------------------------------+------------------------------------------ + table_schema | table_name | operation | remote_new_tup | error_message +--------------+------------+-----------+----------------------------------------------------+--------------------------- | | INSERT | | Spock can't find relation | | INSERT | | Spock can't find relation | | INSERT | | Spock can't find relation | | INSERT | | Spock can't find relation - public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid -(6 rows) + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable +(5 rows) + +-- Check exception_log +SELECT table_schema, table_name, operation, remote_new_tup, error_message +FROM spock.exception_log +ORDER BY command_counter; + table_schema | table_name | operation | remote_new_tup | error_message +--------------+------------+-----------+----------------------------------------------------+--------------------------- + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation +(5 rows) \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); @@ -582,11 +598,25 @@ ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; | | INSERT | | Spock can't find relation | | UPDATE | | Spock can't find relation | | UPDATE | | Spock can't find relation - public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable + public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) +(8 rows) + +-- Check exception_log +SELECT table_schema, table_name, operation, remote_new_tup, error_message +FROM spock.exception_log +ORDER BY command_counter; + table_schema | table_name | operation | remote_new_tup | error_message +--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | UPDATE | | Spock can't find relation + | | UPDATE | | Spock can't find relation public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) -(10 rows) +(8 rows) \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE'); diff --git a/tests/regress/sql/exception_row_capture.sql b/tests/regress/sql/exception_row_capture.sql new file mode 100644 index 00000000..b21d36ea --- /dev/null +++ b/tests/regress/sql/exception_row_capture.sql @@ -0,0 +1,282 @@ +-- +-- Test: Exception behaviour modes (DISCARD, TRANSDISCARD, SUB_DISABLE) +-- +-- Common scenario: three tables on provider, one broken on subscriber. +-- A single transaction with DMLs on all three tables triggers an error +-- on the broken table. The first DML (INSERT into drl_t1) also creates +-- a conflict (INSERT_EXISTS) to verify that exception row capture modes +-- do not log it to spock.resolutions, while DISCARD mode does. +-- +-- Each mode uses a different breakage method: +-- TRANSDISCARD: absent table (DROP TABLE on subscriber) +-- DISCARD: truncated table (TRUNCATE on subscriber, row missing) +-- SUB_DISABLE: deleted row (DELETE on subscriber, row missing) +-- +SELECT * FROM spock_regress_variables() +\gset + +-- ============================================================ +-- Setup: create the three tables on the provider, enable +-- resolution logging so we can verify resolutions behavior. +-- ============================================================ +\c :provider_dsn +ALTER SYSTEM SET spock.save_resolutions = on; +SELECT pg_reload_conf(); + +SELECT spock.replicate_ddl($$ + CREATE TABLE public.drl_t1 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t2 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t3 (x integer PRIMARY KEY); +$$); +SELECT spock.repset_add_table('default', 'drl_t1'); +SELECT spock.repset_add_table('default', 'drl_t2'); +SELECT spock.repset_add_table('default', 'drl_t3'); + +INSERT INTO drl_t1 VALUES (0); +INSERT INTO drl_t2 VALUES (0); +INSERT INTO drl_t3 VALUES (0); +SELECT spock.sync_event() AS sync_lsn \gset + +-- Verify initial data arrived +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); +SELECT * FROM drl_t1; +SELECT * FROM drl_t2; +SELECT * FROM drl_t3; + +-- ============================================================ +-- TRANSDISCARD mode (error: absent table) +-- +-- drl_t1: pre-insert row x=1 on subscriber to set up INSERT_EXISTS +-- drl_t2: DROP TABLE to provoke "can't find relation" error +-- ============================================================ +\c :subscriber_dsn +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +ALTER SYSTEM SET spock.exception_behaviour = 'transdiscard'; +SELECT pg_reload_conf(); + +-- Set up INSERT_EXISTS conflict on drl_t1 +INSERT INTO drl_t1 VALUES (1); +-- Drop table_2 on subscriber to provoke error +DROP TABLE drl_t2; + +\c :provider_dsn +BEGIN; +INSERT INTO drl_t1 VALUES (1); -- conflict: INSERT_EXISTS +UPDATE drl_t2 SET x = 1 WHERE x = 0; -- error: missing relation +UPDATE drl_t3 SET x = 1 WHERE x = 0; -- ok +END; +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + +-- None of the DMLs should have been applied (entire TX discarded) +SELECT * FROM drl_t1 ORDER BY x; +SELECT * FROM drl_t3; + +-- Three records in exception_log; only drl_t2 has a non-NULL error_message. +SELECT table_name, operation, (error_message <> '') AS has_error +FROM spock.exception_log +ORDER BY command_counter; + +-- Resolutions must be empty: exception row capture never executes DML, +-- so no conflict detection happens. +SELECT COUNT(*) AS resolutions_count FROM spock.resolutions; + +-- ============================================================ +-- TRANSDISCARD with DDL in the transaction +-- +-- Verify that a queued DDL operation inside a failing transaction +-- produces exactly one exception_log entry, not a duplicate. +-- ============================================================ +\c :subscriber_dsn +TRUNCATE spock.exception_log; +TRUNCATE drl_t3; + +\c :provider_dsn +BEGIN; +UPDATE drl_t1 SET x = 2 WHERE x = 1; +SELECT spock.replicate_ddl('CREATE TABLE IF NOT EXISTS public.drl_dummy (x int)'); +UPDATE drl_t3 SET x = 2 WHERE x = 1; -- error: row missing after TRUNCATE +END; +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + +-- Expect no duplicate DDL entries: one record per operation. +SELECT table_name, operation, (error_message <> '') AS has_error, ddl_statement +FROM spock.exception_log +ORDER BY command_counter; + +-- Check data: +SELECT x FROM drl_t1 WHERE x = 2; -- Record has not been updated +SELECT * FROM drl_dummy; -- ERROR, table doesn't exist + +-- Cleanup the dummy table +\c :provider_dsn +SELECT spock.replicate_ddl('DROP TABLE IF EXISTS public.drl_dummy'); + +-- ============================================================ +-- Reset for next test +-- ============================================================ +\c :provider_dsn +SELECT spock.replicate_ddl($$ + DROP TABLE IF EXISTS public.drl_t1 CASCADE; + DROP TABLE IF EXISTS public.drl_t2 CASCADE; + DROP TABLE IF EXISTS public.drl_t3 CASCADE; + CREATE TABLE public.drl_t1 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t2 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t3 (x integer PRIMARY KEY); +$$); +SELECT spock.repset_add_table('default', 'drl_t1'); +SELECT spock.repset_add_table('default', 'drl_t2'); +SELECT spock.repset_add_table('default', 'drl_t3'); +INSERT INTO drl_t1 VALUES (0); +INSERT INTO drl_t2 VALUES (0); +INSERT INTO drl_t3 VALUES (0); +SELECT spock.sync_event() AS sync_lsn \gset + +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); +SELECT * FROM drl_t1; +SELECT * FROM drl_t2; +SELECT * FROM drl_t3; + +-- ============================================================ +-- DISCARD mode (error: truncated table, row missing) +-- +-- drl_t1: pre-insert row x=1 to set up INSERT_EXISTS conflict +-- drl_t2: TRUNCATE so the UPDATE can't find the row +-- ============================================================ +\c :subscriber_dsn +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +ALTER SYSTEM SET spock.exception_behaviour = 'discard'; +SELECT pg_reload_conf(); + +-- Set up INSERT_EXISTS conflict on drl_t1 +INSERT INTO drl_t1 VALUES (1); +-- Truncate table_2 on subscriber so the UPDATE can't find the row +TRUNCATE drl_t2; + +\c :provider_dsn +BEGIN; +INSERT INTO drl_t1 VALUES (1); -- conflict: INSERT_EXISTS (resolved) +UPDATE drl_t2 SET x = 1 WHERE x = 0; -- error: row missing after TRUNCATE +UPDATE drl_t3 SET x = 1 WHERE x = 0; -- ok +END; +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + +-- In DISCARD mode: drl_t1 INSERT conflict resolved, drl_t2 failed, +-- drl_t3 applied +SELECT * FROM drl_t1 ORDER BY x; +SELECT * FROM drl_t2; +SELECT * FROM drl_t3; + +-- The failed DML (drl_t2) should appear in exception_log +SELECT table_name, operation, (error_message <> '') AS has_error +FROM spock.exception_log +WHERE table_name IS NOT NULL +ORDER BY command_counter; + +-- Resolutions should contain the INSERT_EXISTS conflict for drl_t1 +SELECT relname, conflict_type FROM spock.resolutions +WHERE relname = 'public.drl_t1'; + +-- ============================================================ +-- Reset for next test +-- ============================================================ +\c :provider_dsn +SELECT spock.replicate_ddl($$ + DROP TABLE IF EXISTS public.drl_t1 CASCADE; + DROP TABLE IF EXISTS public.drl_t2 CASCADE; + DROP TABLE IF EXISTS public.drl_t3 CASCADE; + CREATE TABLE public.drl_t1 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t2 (x integer PRIMARY KEY); + CREATE TABLE public.drl_t3 (x integer PRIMARY KEY); +$$); +SELECT spock.repset_add_table('default', 'drl_t1'); +SELECT spock.repset_add_table('default', 'drl_t2'); +SELECT spock.repset_add_table('default', 'drl_t3'); +INSERT INTO drl_t1 VALUES (0); +INSERT INTO drl_t2 VALUES (0); +INSERT INTO drl_t3 VALUES (0); +SELECT spock.sync_event() AS sync_lsn \gset + +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); +SELECT * FROM drl_t1; +SELECT * FROM drl_t2; +SELECT * FROM drl_t3; + +-- ============================================================ +-- SUB_DISABLE mode (error: deleted row) +-- +-- drl_t1: pre-insert row x=1 to set up INSERT_EXISTS conflict +-- drl_t2: DELETE the row so the UPDATE can't find it +-- ============================================================ +\c :subscriber_dsn +TRUNCATE spock.exception_log; +TRUNCATE spock.resolutions; +ALTER SYSTEM SET spock.exception_behaviour = 'sub_disable'; +SELECT pg_reload_conf(); + +-- Set up INSERT_EXISTS conflict on drl_t1 +INSERT INTO drl_t1 VALUES (1); +-- Delete the row from table_2 on subscriber so the UPDATE can't find it +DELETE FROM drl_t2 WHERE x = 0; + +\c :provider_dsn +BEGIN; +INSERT INTO drl_t1 VALUES (1); -- conflict: INSERT_EXISTS +UPDATE drl_t2 SET x = 1 WHERE x = 0; -- error: row missing after DELETE +UPDATE drl_t3 SET x = 1 WHERE x = 0; -- ok +END; + +-- Fetch the xid of the last UPDATE so we can skip it later +SELECT fetch_last_xid('U') AS remote_xid \gset + +\c :subscriber_dsn + +-- Subscription should be disabled now +SELECT sub_enabled FROM spock.subscription + WHERE sub_name = 'test_subscription'; + +-- Three DML records plus one SUB_DISABLE record in exception_log. +SELECT table_name, operation, (error_message <> '') AS has_error +FROM spock.exception_log +ORDER BY command_counter; + +-- None of the DMLs should have been applied +SELECT * FROM drl_t1 ORDER BY x; +SELECT * FROM drl_t3; + +-- Resolutions must be empty: exception row capture never executes DML +SELECT COUNT(*) AS resolutions_count FROM spock.resolutions; + +-- Re-enable subscription for cleanup +SELECT skiplsn_and_enable_sub('test_subscription', :remote_xid); + +\c :provider_dsn +SELECT spock.sync_event() AS sync_lsn \gset +\c :subscriber_dsn +CALL spock.wait_for_sync_event(NULL, 'test_provider', :'sync_lsn', 30); + +-- ============================================================ +-- Cleanup +-- ============================================================ +ALTER SYSTEM RESET spock.exception_behaviour; +SELECT pg_reload_conf(); + +\c :provider_dsn +ALTER SYSTEM SET spock.save_resolutions = off; +SELECT pg_reload_conf(); + +SELECT spock.replicate_ddl($$ + DROP TABLE IF EXISTS public.drl_t1 CASCADE; + DROP TABLE IF EXISTS public.drl_t2 CASCADE; + DROP TABLE IF EXISTS public.drl_t3 CASCADE; +$$); diff --git a/tests/regress/sql/replication_set.sql b/tests/regress/sql/replication_set.sql index dae09930..569af22c 100644 --- a/tests/regress/sql/replication_set.sql +++ b/tests/regress/sql/replication_set.sql @@ -2,7 +2,11 @@ SELECT * FROM spock_regress_variables() \gset +-- Cleanup in advance to make the test more stable +\c :subscriber_dsn +TRUNCATE spock.exception_log; \c :provider_dsn +TRUNCATE spock.exception_log; SELECT spock.replicate_ddl($$ CREATE SCHEMA normalschema; @@ -218,6 +222,11 @@ SELECT FROM spock.exception_log ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; +-- Check exception_log +SELECT table_schema, table_name, operation, remote_new_tup, error_message +FROM spock.exception_log +ORDER BY command_counter; + \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); @@ -272,6 +281,11 @@ SELECT FROM spock.exception_log ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; +-- Check exception_log +SELECT table_schema, table_name, operation, remote_new_tup, error_message +FROM spock.exception_log +ORDER BY command_counter; + \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE');