diff --git a/.github/workflows/installcheck.yml b/.github/workflows/installcheck.yml index 2ed6825f..59b40fe1 100644 --- a/.github/workflows/installcheck.yml +++ b/.github/workflows/installcheck.yml @@ -53,6 +53,7 @@ jobs: version: latest - name: Start docker cluster + id: start_cluster run: | cd ${GITHUB_WORKSPACE}/tests/docker/ # To minimize regression tests difference, override pgedge.env with @@ -63,6 +64,22 @@ jobs: PGVER=${{ matrix.pgver }} DBUSER=regression DBNAME=regression \ docker compose up --build --wait -d timeout-minutes: 20 + continue-on-error: true + + - name: Diagnose cluster startup failure + if: steps.start_cluster.outcome == 'failure' + run: | + cd ${GITHUB_WORKSPACE}/tests/docker/ + echo "=== Docker container status ===" + docker compose ps -a + for node in n1 n2 n3; do + echo "" + echo "=== Container logs: $node ===" + docker compose logs pgedge-$node 2>&1 | tail -80 || true + echo "" + echo "=== PostgreSQL logfile: $node ===" + docker compose cp pgedge-$node:/home/pgedge/logfile.log /dev/stdout 2>/dev/null | tail -80 || echo "(not available)" + done - name: Run installcheck on node n1 id: installcheck diff --git a/Makefile b/Makefile index 66bdcd08..7e040cae 100644 --- a/Makefile +++ b/Makefile @@ -50,8 +50,14 @@ all: spock.control # ----------------------------------------------------------------------------- # Regression tests # ----------------------------------------------------------------------------- +# PG18+ only tests +REGRESS_PG18 = +ifeq ($(shell test $(PGVER) -ge 18 && echo yes),yes) +REGRESS_PG18 = conflict_stat +endif + REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondary_unique \ - excluded_schema \ + excluded_schema $(REGRESS_PG18) \ toasted replication_set matview bidirectional primary_key \ interfaces foreign_key copy sequence triggers parallel functions row_filter \ row_filter_sampling att_list column_filter apply_delay \ diff --git a/include/spock_conflict.h b/include/spock_conflict.h index e805baa9..d0a41d78 100644 --- a/include/spock_conflict.h +++ b/include/spock_conflict.h @@ -51,7 +51,7 @@ extern bool spock_save_resolutions; typedef enum { /* The row to be inserted violates unique constraint */ - SPOCK_CT_INSERT_EXISTS, + SPOCK_CT_INSERT_EXISTS = 0, /* The row to be updated was modified by a different origin */ SPOCK_CT_UPDATE_ORIGIN_DIFFERS, @@ -76,6 +76,12 @@ typedef enum } SpockConflictType; +/* + * SPOCK_CT_DELETE_LATE is excluded because it is not yet tracked in conflict + * statistics. + */ +#define SPOCK_CONFLICT_NUM_TYPES (SPOCK_CT_DELETE_MISSING + 1) + extern int spock_conflict_resolver; extern int spock_conflict_log_level; extern bool spock_save_resolutions; diff --git a/include/spock_conflict_stat.h b/include/spock_conflict_stat.h new file mode 100644 index 00000000..492f8446 --- /dev/null +++ b/include/spock_conflict_stat.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * spock_conflict_stat.h + * spock subscription conflict statistics + * + * Copyright (c) 2022-2026, pgEdge, Inc. + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, The Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef SPOCK_CONFLICT_STAT_H +#define SPOCK_CONFLICT_STAT_H + +#include "postgres.h" + +#if PG_VERSION_NUM >= 180000 + +#include "pgstat.h" + +#include "spock_conflict.h" + +/* Shared memory stats entry for spock subscription conflicts */ +typedef struct Spock_Stat_StatSubEntry +{ + PgStat_Counter conflict_count[SPOCK_CONFLICT_NUM_TYPES]; + TimestampTz stat_reset_timestamp; +} Spock_Stat_StatSubEntry; + +/* Pending (backend-local) entry for spock subscription conflicts */ +typedef struct Spock_Stat_PendingSubEntry +{ + PgStat_Counter conflict_count[SPOCK_CONFLICT_NUM_TYPES]; +} Spock_Stat_PendingSubEntry; + +extern void spock_stat_register_conflict_stat(void); + +extern void spock_stat_report_subscription_conflict(Oid subid, + SpockConflictType type); +extern void spock_stat_create_subscription(Oid subid); +extern void spock_stat_drop_subscription(Oid subid); +extern Spock_Stat_StatSubEntry *spock_stat_fetch_stat_subscription(Oid subid); + +#endif /* PG_VERSION_NUM >= 180000 */ + +#endif /* SPOCK_CONFLICT_STAT_H */ \ No newline at end of file diff --git a/sql/spock--5.0.4--6.0.0-devel.sql b/sql/spock--5.0.4--6.0.0-devel.sql index 7695cd1d..dd83be6a 100644 --- a/sql/spock--5.0.4--6.0.0-devel.sql +++ b/sql/spock--5.0.4--6.0.0-devel.sql @@ -68,6 +68,29 @@ SET conflict_type = CASE conflict_type ELSE conflict_type END; +-- ---- +-- Subscription conflict statistics +-- ---- +CREATE FUNCTION spock.get_subscription_stats( + subid oid, + OUT subid oid, + OUT confl_insert_exists bigint, + OUT confl_update_origin_differs bigint, + OUT confl_update_exists bigint, + OUT confl_update_missing bigint, + OUT confl_delete_origin_differs bigint, + OUT confl_delete_missing bigint, + OUT stats_reset timestamptz +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_get_subscription_stats' +LANGUAGE C STABLE; + +CREATE FUNCTION spock.reset_subscription_stats(subid oid DEFAULT NULL) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_reset_subscription_stats' +LANGUAGE C CALLED ON NULL INPUT VOLATILE; + -- Set delta_apply security label on specific column CREATE FUNCTION spock.delta_apply( rel regclass, diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index 116face1..ae51d34e 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -712,6 +712,29 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- ---- +-- Subscription conflict statistics +-- ---- +CREATE FUNCTION spock.get_subscription_stats( + subid oid, + OUT subid oid, + OUT confl_insert_exists bigint, + OUT confl_update_origin_differs bigint, + OUT confl_update_exists bigint, + OUT confl_update_missing bigint, + OUT confl_delete_origin_differs bigint, + OUT confl_delete_missing bigint, + OUT stats_reset timestamptz +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_get_subscription_stats' +LANGUAGE C STABLE; + +CREATE FUNCTION spock.reset_subscription_stats(subid oid DEFAULT NULL) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_reset_subscription_stats' +LANGUAGE C CALLED ON NULL INPUT VOLATILE; + -- Set delta_apply security label on specific column CREATE FUNCTION spock.delta_apply( rel regclass, diff --git a/src/spock.c b/src/spock.c index d989a6c1..02ce0c9d 100644 --- a/src/spock.c +++ b/src/spock.c @@ -53,6 +53,9 @@ #include "pgstat.h" #include "spock_apply.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_executor.h" #include "spock_node.h" #include "spock_conflict.h" @@ -1224,4 +1227,9 @@ _PG_init(void) /* Security label provider hook */ register_label_provider(SPOCK_SECLABEL_PROVIDER, spock_object_relabel); + +#if PG_VERSION_NUM >= 180000 + /* Spock replication conflict statistics */ + spock_stat_register_conflict_stat(); +#endif } diff --git a/src/spock_apply_heap.c b/src/spock_apply_heap.c index f6ba7457..47efa55e 100644 --- a/src/spock_apply_heap.c +++ b/src/spock_apply_heap.c @@ -64,6 +64,9 @@ #include "spock_common.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_executor.h" #include "spock_node.h" #include "spock_proto_native.h" @@ -1067,6 +1070,16 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, /* SPOCK_CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */ SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index]; +#if PG_VERSION_NUM >= 180000 + if (!MyApplyWorker->use_try_block) + /* + * To avoid duplicate messages, only report the conflict on the + * successful pathway. We skip counting when the update logic has + * already failed because the conflict would be misleading. + */ + spock_stat_report_subscription_conflict(MyApplyWorker->subid, + SPOCK_CT_UPDATE_MISSING); +#endif /* * The tuple to be updated could not be found. Do nothing except for * emitting a log message. TODO: Add pkey information as well. diff --git a/src/spock_conflict.c b/src/spock_conflict.c index e79a519f..b369273f 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -51,6 +51,9 @@ #include "spock.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_proto_native.h" #include "spock_node.h" #include "spock_worker.h" @@ -372,6 +375,15 @@ spock_report_conflict(SpockConflictType conflict_type, handle_stats_counter(rel->rel, MyApplyWorker->subid, SPOCK_STATS_CONFLICT_COUNT, 1); +#if PG_VERSION_NUM >= 180000 + /* + * TODO: Can't enable until SPOCK_CT_DELETE_LATE is either included in + * SPOCK_CONFLICT_NUM_TYPES or filtered out here — passing it as-is would + * overflow the conflict_count[] array. + * + * spock_stat_report_subscription_conflict(MyApplyWorker->subid, conflict_type); + */ +#endif /* If configured log resolution to spock.resolutions table */ spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, remotetuple, applytuple, resolution, diff --git a/src/spock_conflict_stat.c b/src/spock_conflict_stat.c new file mode 100644 index 00000000..7d5fc311 --- /dev/null +++ b/src/spock_conflict_stat.c @@ -0,0 +1,309 @@ +/*------------------------------------------------------------------------- + * + * spock_conflict_stat.c + * spock subscription conflict statistics + * + * NOTE: Unlike PostgreSQL subscription statistics, Spock statistics cannot be + * cluster-wide because spock node ID, origin ID, and subscription ID are + * unique only within a database. Therefore, we use MyDatabaseId to identify + * each statistics entry. + * + * Copyright (c) 2022-2026, pgEdge, Inc. + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, The Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "funcapi.h" +#include "utils/pgstat_internal.h" + +#include "spock.h" + +PG_FUNCTION_INFO_V1(spock_get_subscription_stats); +PG_FUNCTION_INFO_V1(spock_reset_subscription_stats); + +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" + +/* + * Kind ID reserved for statistics of spock replication conflicts. + * TODO: see https://wiki.postgresql.org/wiki/CustomCumulativeStats to choose + * specific value in production + */ +#define SPOCK_PGSTAT_KIND_LRCONFLICTS 28 + +/* Shared memory wrapper for spock subscription conflict stats */ +typedef struct Spock_Stat_Subscription +{ + PgStatShared_Common header; + Spock_Stat_StatSubEntry stats; +} Spock_Stat_Subscription; + +/* + * Column names for spock_get_subscription_stats(), indexed by + * SpockConflictType. Kept in sync with the enum via designated initializers + * so that reordering the enum produces a compile-time error rather than + * silently wrong output. + */ +static const char *const SpockConflictStatColNames[SPOCK_CONFLICT_NUM_TYPES] = { + [SPOCK_CT_INSERT_EXISTS] = "confl_insert_exists", + [SPOCK_CT_UPDATE_ORIGIN_DIFFERS] = "confl_update_origin_differs", + [SPOCK_CT_UPDATE_EXISTS] = "confl_update_exists", + [SPOCK_CT_UPDATE_MISSING] = "confl_update_missing", + [SPOCK_CT_DELETE_ORIGIN_DIFFERS] = "confl_delete_origin_differs", + [SPOCK_CT_DELETE_MISSING] = "confl_delete_missing", +}; + +static bool spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, + bool nowait); +static void spock_stat_subscription_reset_timestamp_cb( + PgStatShared_Common *header, + TimestampTz ts); + +/* + * We rely on the pgstat infrastructure here, employing spock's own conflict + * detection algorithm with custom statistics storage. + */ + +static const PgStat_KindInfo spock_conflict_stat = { + .name = "spock_conflict_stat", + .fixed_amount = false, + .write_to_file = true, + + .shared_size = sizeof(Spock_Stat_Subscription), + .shared_data_off = offsetof(Spock_Stat_Subscription, stats), + .shared_data_len = sizeof(((Spock_Stat_Subscription *) 0)->stats), + .pending_size = sizeof(Spock_Stat_PendingSubEntry), + + .flush_pending_cb = spock_stat_subscription_flush_cb, + .reset_timestamp_cb = spock_stat_subscription_reset_timestamp_cb, +}; + +void +spock_stat_register_conflict_stat(void) +{ + pgstat_register_kind(SPOCK_PGSTAT_KIND_LRCONFLICTS, &spock_conflict_stat); +} + +/* + * Report a subscription conflict. + */ +void +spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) +{ + PgStat_EntryRef *entry_ref; + Spock_Stat_PendingSubEntry *pending; + + if (type != SPOCK_CT_UPDATE_MISSING) + /* + * Should happen only in development. Detect it as fast as possible + * with the highest error level that does not crash the instance. + */ + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("unexpected conflict type %d reported for subscription %u", + type, subid))); + + entry_ref = pgstat_prep_pending_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid, NULL); + pending = entry_ref->pending; + pending->conflict_count[type]++; +} + +/* + * Report creating the subscription. + */ +void +spock_stat_create_subscription(Oid subid) +{ + PgStat_EntryRef *ref; + + /* Ensures that stats are dropped if transaction rolls back */ + pgstat_create_transactional(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid); + + /* Create and initialize the subscription stats entry */ + ref = pgstat_get_entry_ref(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, + true, NULL); + + if (pg_atomic_read_u32(&ref->shared_entry->refcount) != 2) + /* + * Should never happen: a new subscription stats entry should have + * exactly two references (the hashtable entry and our own). A higher + * count means a stale entry from a previous subscription with the same + * OID was not properly cleaned up. + */ + ereport(WARNING, + (errmsg("conflict statistics entry for subscription %u " + "already has %u references", + subid, + pg_atomic_read_u32(&ref->shared_entry->refcount)), + errhint("This may indicate that a previous subscription with " + "the same OID was not fully dropped."))); + + pgstat_reset_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, 0); +} + +/* + * Report dropping the subscription. + * + * Ensures that stats are dropped if transaction commits. + */ +void +spock_stat_drop_subscription(Oid subid) +{ + pgstat_drop_transactional(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid); +} + +/* + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for one subscription or NULL. + */ +Spock_Stat_StatSubEntry * +spock_stat_fetch_stat_subscription(Oid subid) +{ + return (Spock_Stat_StatSubEntry *) + pgstat_fetch_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid); +} + +/* + * Get the subscription statistics for the given subscription. If the + * subscription statistics is not available, return all-zeros stats. + */ +Datum +spock_get_subscription_stats(PG_FUNCTION_ARGS) +{ +#define SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS (1 + SPOCK_CONFLICT_NUM_TYPES + 1) + Oid subid = PG_GETARG_OID(0); + TupleDesc tupdesc; + Datum values[SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; + bool nulls[SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; + Spock_Stat_StatSubEntry *subentry; + Spock_Stat_StatSubEntry allzero; + int i = 0; + AttrNumber attnum = 1; + + /* Get subscription stats */ + subentry = spock_stat_fetch_stat_subscription(subid); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS); + TupleDescInitEntry(tupdesc, attnum++, "subid", + OIDOID, -1, 0); + for (int c = 0; c < SPOCK_CONFLICT_NUM_TYPES; c++) + TupleDescInitEntry(tupdesc, attnum++, SpockConflictStatColNames[c], + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, attnum++, "stats_reset", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); + + if (!subentry) + { + /* If the subscription is not found, initialise its stats */ + memset(&allzero, 0, sizeof(Spock_Stat_StatSubEntry)); + subentry = &allzero; + } + + /* subid */ + values[i++] = ObjectIdGetDatum(subid); + + /* conflict counts */ + for (int nconflict = 0; nconflict < SPOCK_CONFLICT_NUM_TYPES; nconflict++) + values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]); + + /* stats_reset */ + if (subentry->stat_reset_timestamp == 0) + nulls[i] = true; + else + values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp); + + Assert(i + 1 == SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS); + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} +#undef SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS + +/* Reset subscription stats (a specific one or all of them) */ +Datum +spock_reset_subscription_stats(PG_FUNCTION_ARGS) +{ + Oid subid; + + if (PG_ARGISNULL(0)) + { + /* Clear all subscription stats */ + pgstat_reset_of_kind(SPOCK_PGSTAT_KIND_LRCONFLICTS); + } + else + { + subid = PG_GETARG_OID(0); + + if (!OidIsValid(subid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid subscription OID %u", subid))); + pgstat_reset(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid); + } + + PG_RETURN_VOID(); +} + +static bool +spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) +{ + Spock_Stat_PendingSubEntry *localent; + Spock_Stat_Subscription *shsubent; + + localent = (Spock_Stat_PendingSubEntry *) entry_ref->pending; + shsubent = (Spock_Stat_Subscription *) entry_ref->shared_stats; + + /* localent always has non-zero content */ + + if (!pgstat_lock_entry(entry_ref, nowait)) + return false; + + for (int i = 0; i < SPOCK_CONFLICT_NUM_TYPES; i++) + shsubent->stats.conflict_count[i] += localent->conflict_count[i]; + + pgstat_unlock_entry(entry_ref); + return true; +} + +static void +spock_stat_subscription_reset_timestamp_cb(PgStatShared_Common *header, + TimestampTz ts) +{ + ((Spock_Stat_Subscription *) header)->stats.stat_reset_timestamp = ts; +} + +#endif /* PG_VERSION_NUM >= 180000 */ + +#if PG_VERSION_NUM < 180000 + +/* + * XXX: implement conflict statistics gathering, if needed + */ + +Datum +spock_get_subscription_stats(PG_FUNCTION_ARGS) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("spock conflict statistics require PostgreSQL 18 or later"))); + PG_RETURN_NULL(); /* unreachable; suppress compiler warning */ +} + +Datum +spock_reset_subscription_stats(PG_FUNCTION_ARGS) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("spock conflict statistics require PostgreSQL 18 or later"))); + PG_RETURN_NULL(); /* unreachable; suppress compiler warning */ +} + +#endif /* PG_VERSION_NUM < 180000 */ diff --git a/src/spock_functions.c b/src/spock_functions.c index a7836321..5525fdf0 100644 --- a/src/spock_functions.c +++ b/src/spock_functions.c @@ -86,6 +86,9 @@ #include "spock_apply.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_dependency.h" #include "spock_executor.h" #include "spock_node.h" @@ -590,6 +593,9 @@ spock_create_subscription(PG_FUNCTION_ARGS) sub.skip_schema = textarray_to_list(skip_schema_names); create_subscription(&sub); +#if PG_VERSION_NUM >= 180000 + spock_stat_create_subscription(sub.id); +#endif /* Create progress entry to track commit ts per local/remote origin */ spock_group_attach(MyDatabaseId, localnode->node->id, originif.nodeid); @@ -664,6 +670,9 @@ spock_drop_subscription(PG_FUNCTION_ARGS) /* Drop the actual subscription. */ drop_subscription(sub->id); +#if PG_VERSION_NUM >= 180000 + spock_stat_drop_subscription(sub->id); +#endif /* * The rest is different depending on if we are doing this on provider diff --git a/tests/regress/expected/conflict_stat.out b/tests/regress/expected/conflict_stat.out new file mode 100644 index 00000000..2801b17c --- /dev/null +++ b/tests/regress/expected/conflict_stat.out @@ -0,0 +1,142 @@ +-- Test: conflict statistics for UPDATE_MISSING conflicts (PG18+ only) +SELECT * FROM spock_regress_variables() +\gset +\c :provider_dsn +-- Create a simple table for conflict testing +SELECT spock.replicate_ddl($$ + CREATE TABLE public.conflict_stat_test ( + id integer PRIMARY KEY, + data text + ); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT * FROM spock.repset_add_table('default', 'conflict_stat_test'); + repset_add_table +------------------ + t +(1 row) + +-- Seed initial rows +INSERT INTO conflict_stat_test VALUES (1, 'row1'); +INSERT INTO conflict_stat_test VALUES (2, 'row2'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Get subscription OID for stats queries +SELECT sub_id AS test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset +-- Reset conflict stats before the test +SELECT spock.reset_subscription_stats(:test_sub_id); + reset_subscription_stats +-------------------------- + +(1 row) + +-- Verify counters are zero initially +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 0 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Delete a row on subscriber only to set up UPDATE_MISSING +DELETE FROM conflict_stat_test WHERE id = 1; +TRUNCATE spock.exception_log; +\c :provider_dsn +-- Update the row that no longer exists on subscriber +UPDATE conflict_stat_test SET data = 'updated_row1' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Row id=1 should still be missing on subscriber (update was skipped) +SELECT * FROM conflict_stat_test ORDER BY id; + id | data +----+------ + 2 | row2 +(1 row) + +-- The UPDATE_MISSING conflict should be logged in exception_log +SELECT operation, table_name FROM spock.exception_log; + operation | table_name +-----------+-------------------- + UPDATE | conflict_stat_test +(1 row) + +-- Verify that the UPDATE_MISSING conflict was counted +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 1 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Provoke a second UPDATE_MISSING to confirm counter increments +DELETE FROM conflict_stat_test WHERE id = 2; +TRUNCATE spock.exception_log; +\c :provider_dsn +UPDATE conflict_stat_test SET data = 'updated_row2' WHERE id = 2; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +SELECT * FROM conflict_stat_test ORDER BY id; + id | data +----+------ +(0 rows) + +-- Counter should now be 2 +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 2 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Test reset: clear the stats and verify counter goes back to zero +SELECT spock.reset_subscription_stats(:test_sub_id); + reset_subscription_stats +-------------------------- + +(1 row) + +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 0 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Cleanup +TRUNCATE spock.exception_log; +\c :provider_dsn +SELECT spock.replicate_ddl($$ DROP TABLE public.conflict_stat_test CASCADE; $$); +NOTICE: drop cascades to table conflict_stat_test membership in replication set default + replicate_ddl +--------------- + t +(1 row) + diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index 398ea9ac..5d917efb 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -223,6 +223,8 @@ NOTICE: drop cascades to 2 other objects (1 row) \c :subscriber_dsn +-- First time come by to the subscriber node. Clean the history in exception_log +TRUNCATE spock.exception_log; SELECT * FROM spock.replication_set; set_id | set_nodeid | set_name | replicate_insert | replicate_update | replicate_delete | replicate_truncate ------------+------------+---------------------+------------------+------------------+------------------+-------------------- @@ -445,7 +447,7 @@ SELECT * FROM spoc_102l ORDER BY x; -- Check exception log format SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -454,15 +456,15 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ - 1 | | | INSERT | | Spock can't find relation - 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; + table_schema | table_name | operation | remote_new_tup | error_message +--------------+------------+-----------+----------------------------------------------------+------------------------------------------ + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid (6 rows) \c :provider_dsn @@ -562,7 +564,7 @@ SELECT * FROM spoc_102g_u ORDER BY x; (2 rows) SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -571,19 +573,19 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- - 1 | | | INSERT | | Spock can't find relation - 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 7 | | | UPDATE | | Spock can't find relation - 8 | | | UPDATE | | Spock can't find relation - 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; + table_schema | table_name | operation | remote_new_tup | error_message +--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | UPDATE | | Spock can't find relation + | | UPDATE | | Spock can't find relation + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) (10 rows) \c :provider_dsn diff --git a/tests/regress/sql/conflict_stat.sql b/tests/regress/sql/conflict_stat.sql new file mode 100644 index 00000000..f6d74a3e --- /dev/null +++ b/tests/regress/sql/conflict_stat.sql @@ -0,0 +1,91 @@ +-- Test: conflict statistics for UPDATE_MISSING conflicts (PG18+ only) +SELECT * FROM spock_regress_variables() +\gset + +\c :provider_dsn + +-- Create a simple table for conflict testing +SELECT spock.replicate_ddl($$ + CREATE TABLE public.conflict_stat_test ( + id integer PRIMARY KEY, + data text + ); +$$); + +SELECT * FROM spock.repset_add_table('default', 'conflict_stat_test'); + +-- Seed initial rows +INSERT INTO conflict_stat_test VALUES (1, 'row1'); +INSERT INTO conflict_stat_test VALUES (2, 'row2'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- Get subscription OID for stats queries +SELECT sub_id AS test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset + +-- Reset conflict stats before the test +SELECT spock.reset_subscription_stats(:test_sub_id); + +-- Verify counters are zero initially +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Delete a row on subscriber only to set up UPDATE_MISSING +DELETE FROM conflict_stat_test WHERE id = 1; +TRUNCATE spock.exception_log; + +\c :provider_dsn + +-- Update the row that no longer exists on subscriber +UPDATE conflict_stat_test SET data = 'updated_row1' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- Row id=1 should still be missing on subscriber (update was skipped) +SELECT * FROM conflict_stat_test ORDER BY id; + +-- The UPDATE_MISSING conflict should be logged in exception_log +SELECT operation, table_name FROM spock.exception_log; + +-- Verify that the UPDATE_MISSING conflict was counted +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Provoke a second UPDATE_MISSING to confirm counter increments +DELETE FROM conflict_stat_test WHERE id = 2; +TRUNCATE spock.exception_log; + +\c :provider_dsn + +UPDATE conflict_stat_test SET data = 'updated_row2' WHERE id = 2; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +SELECT * FROM conflict_stat_test ORDER BY id; + +-- Counter should now be 2 +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Test reset: clear the stats and verify counter goes back to zero +SELECT spock.reset_subscription_stats(:test_sub_id); + +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Cleanup +TRUNCATE spock.exception_log; +\c :provider_dsn +SELECT spock.replicate_ddl($$ DROP TABLE public.conflict_stat_test CASCADE; $$); diff --git a/tests/regress/sql/replication_set.sql b/tests/regress/sql/replication_set.sql index b9132a73..dae09930 100644 --- a/tests/regress/sql/replication_set.sql +++ b/tests/regress/sql/replication_set.sql @@ -98,6 +98,10 @@ SELECT spock.replicate_ddl($$ $$); \c :subscriber_dsn + +-- First time come by to the subscriber node. Clean the history in exception_log +TRUNCATE spock.exception_log; + SELECT * FROM spock.replication_set; -- Issue SPOC-102 @@ -203,7 +207,7 @@ SELECT * FROM spoc_102l ORDER BY x; -- Check exception log format SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -212,7 +216,7 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); @@ -257,7 +261,7 @@ SELECT * FROM spoc_102l_u ORDER BY x; SELECT * FROM spoc_102g_u ORDER BY x; SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -266,7 +270,7 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE');