diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 0492d92d23b1..5987d90ee080 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -52,11 +52,13 @@ #include "catalog/pg_type.h" #include "nodes/execnodes.h" #include "pgstat.h" +#include "replication/logicalworker.h" #include "storage/lmgr.h" #include "storage/predicate.h" #include "utils/ruleutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#include "utils/injection_point.h" /* ---------------------------------------------------------------- @@ -751,6 +753,13 @@ index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot * * the index. */ Assert(ItemPointerIsValid(&scan->xs_heaptid)); +#ifdef USE_INJECTION_POINTS + if (!IsCatalogRelation(scan->heapRelation) && IsLogicalWorker()) + { + INJECTION_POINT("index_getnext_slot_before_fetch_apply_dirty", NULL); + } +#endif + if (index_fetch_heap(scan, slot)) return true; } diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README index 53d4a61dc3f1..634a3d10bb17 100644 --- a/src/backend/access/nbtree/README +++ b/src/backend/access/nbtree/README @@ -103,6 +103,15 @@ We also remember the left-link, and follow it when the scan moves backwards (though this requires extra handling to account for concurrent splits of the left sibling; see detailed move-left algorithm below). +Despite the described mechanics in place, inconsistent results may still occur +during non-MVCC scans (SnapshotDirty and SnapshotSelf). This issue can occur if a +concurrent transaction deletes a tuple and inserts a new tuple with a new TID in the +same page or to the left/right (depending on scan direction) of current scan position. +If the scan has already visited the page and cached its content in the +backend-local storage, it might skip the old tuple due to deletion and miss the new +tuple because the scan does not re-read the page. Note it affects not only btree +scan but also a heap scan. + In most cases we release our lock and pin on a page before attempting to acquire pin and lock on the page we are moving to. In a few places it is necessary to lock the next page before releasing the current one. diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c index 401606f840ad..f074d6c2765e 100644 --- a/src/backend/executor/execIndexing.c +++ b/src/backend/executor/execIndexing.c @@ -117,6 +117,7 @@ #include "utils/multirangetypes.h" #include "utils/rangetypes.h" #include "utils/snapmgr.h" +#include "utils/injection_point.h" /* waitMode argument to check_exclusion_or_unique_constraint() */ typedef enum @@ -780,7 +781,9 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index, /* * Search the tuples that are in the index for any violations, including * tuples that aren't visible yet. - */ + * Snapshot dirty may miss some tuples in the case of parallel updates, + * but it is acceptable here. + */ InitDirtySnapshot(DirtySnapshot); for (i = 0; i < indnkeyatts; i++) @@ -943,6 +946,8 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index, ExecDropSingleTupleTableSlot(existing_slot); + if (!conflict) + INJECTION_POINT("check_exclusion_or_unique_constraint_no_conflict", NULL); return !conflict; } diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index def32774c90d..1e434ab697ab 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -186,8 +186,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, ScanKeyData skey[INDEX_MAX_KEYS]; int skey_attoff; IndexScanDesc scan; - SnapshotData snap; - TransactionId xwait; Relation idxrel; bool found; TypeCacheEntry **eq = NULL; @@ -198,17 +196,17 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid); - InitDirtySnapshot(snap); - /* Build scan key. */ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot); - /* Start an index scan. */ - scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0); + /* Start an index scan. SnapshotAny will be replaced below. */ + scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0); retry: found = false; - + PushActiveSnapshot(GetLatestSnapshot()); + /* Update the actual scan snapshot each retry */ + scan->xs_snapshot = GetActiveSnapshot(); index_rescan(scan, skey, skey_attoff, NULL, 0); /* Try to find the tuple */ @@ -229,19 +227,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, ExecMaterializeSlot(outslot); - xwait = TransactionIdIsValid(snap.xmin) ? - snap.xmin : snap.xmax; - - /* - * If the tuple is locked, wait for locking transaction to finish and - * retry. - */ - if (TransactionIdIsValid(xwait)) - { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; - } - /* Found our tuple and it's not locked */ found = true; break; @@ -253,8 +238,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TM_FailureData tmfd; TM_Result res; - PushActiveSnapshot(GetLatestSnapshot()); - res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), outslot, GetCurrentCommandId(false), @@ -263,13 +246,15 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid, 0 /* don't follow updates */ , &tmfd); - PopActiveSnapshot(); - if (should_refetch_tuple(res, &tmfd)) + { + PopActiveSnapshot(); goto retry; + } } index_endscan(scan); + PopActiveSnapshot(); /* Don't release lock until commit. */ index_close(idxrel, NoLock); @@ -370,9 +355,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, { TupleTableSlot *scanslot; TableScanDesc scan; - SnapshotData snap; TypeCacheEntry **eq; - TransactionId xwait; bool found; TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); @@ -380,13 +363,15 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); - /* Start a heap scan. */ - InitDirtySnapshot(snap); - scan = table_beginscan(rel, &snap, 0, NULL); + /* Start a heap scan. SnapshotAny will be replaced below. */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL); scanslot = table_slot_create(rel, NULL); retry: found = false; + PushActiveSnapshot(GetLatestSnapshot()); + /* Update the actual scan snapshot each retry */ + scan->rs_snapshot = GetActiveSnapshot(); table_rescan(scan, NULL); @@ -399,19 +384,6 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, found = true; ExecCopySlot(outslot, scanslot); - xwait = TransactionIdIsValid(snap.xmin) ? - snap.xmin : snap.xmax; - - /* - * If the tuple is locked, wait for locking transaction to finish and - * retry. - */ - if (TransactionIdIsValid(xwait)) - { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; - } - /* Found our tuple and it's not locked */ break; } @@ -422,8 +394,6 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TM_FailureData tmfd; TM_Result res; - PushActiveSnapshot(GetLatestSnapshot()); - res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), outslot, GetCurrentCommandId(false), @@ -432,13 +402,16 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, 0 /* don't follow updates */ , &tmfd); - PopActiveSnapshot(); if (should_refetch_tuple(res, &tmfd)) + { + PopActiveSnapshot(); goto retry; + } } table_endscan(scan); + PopActiveSnapshot(); ExecDropSingleTupleTableSlot(scanslot); return found; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7edd1c9cf060..0f2ffc754c99 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -286,6 +286,7 @@ #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/guc.h" +#include "utils/injection_point.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -2962,7 +2963,10 @@ apply_handle_update_internal(ApplyExecutionData *edata, conflicttuple.origin != replorigin_session_origin) type = CT_UPDATE_DELETED; else + { + INJECTION_POINT("apply_handle_update_internal_update_missing", NULL); type = CT_UPDATE_MISSING; + } /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 0e546ec14974..189dfd711038 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -53,6 +53,13 @@ typedef enum SnapshotType * - previous commands of this transaction * - changes made by the current command * + * Note: such a snapshot may miss an existing logical tuple in case of + * parallel update. + * If a new version of a tuple is inserted into an already processed page + * but the old one marked with committed xmax - snapshot will skip the old + * one and never meet the new one during that scan - resulting in skipping + * that tuple at all. + * * Does _not_ include: * - in-progress transactions (as of the current instant) * ------------------------------------------------------------------------- @@ -82,6 +89,13 @@ typedef enum SnapshotType * transaction and committed/aborted xacts are concerned. However, it * also includes the effects of other xacts still in progress. * + * Note: such a snapshot may miss an existing logical tuple in case of + * parallel update. + * If a new version of a tuple is inserted into an already processed page but the + * old one marked with committed/in-progress xmax - snapshot will skip the old one + * and never meet the new one during that scan - resulting in skipping that tuple + * at all. + * * A special hack is that when a snapshot of this type is used to * determine tuple visibility, the passed-in snapshot struct is used as an * output argument to return the xids of concurrent xacts that affected diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 85d10a89994e..b552ae60a88f 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -46,6 +46,10 @@ tests += { 't/034_temporal.pl', 't/035_conflicts.pl', 't/036_sequences.pl', + 't/037_delete_missing_race.pl', + 't/038_update_missing_race.pl', + 't/039_update_missing_with_retain.pl', + 't/040_update_missing_simulation.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/037_delete_missing_race.pl b/src/test/subscription/t/037_delete_missing_race.pl new file mode 100644 index 000000000000..51dd351dc105 --- /dev/null +++ b/src/test/subscription/t/037_delete_missing_race.pl @@ -0,0 +1,139 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection and resolution in logical replication +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +############################## Set it to 0 to make set success; TODO: delete that for commit +my $simulate_race_condition = 1; +############################## + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_publisher->start; + + +# Create subscriber node with track_commit_timestamp enabled +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->start; + + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text);"); + +# Create similar table on subscriber with additional index to disable HOT updates +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text); + CREATE INDEX data_index ON conf_tab(data);"); + +# Set up extension to simulate race condition +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE conf_tab"); + +# Insert row to be updated later +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +############################################ +# Race condition because of DirtySnapshot +############################################ + +my $psql_session_subscriber = $node_subscriber->background_psql('postgres'); +if ($simulate_race_condition) +{ + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')"); +} + +my $log_offset = -s $node_subscriber->logfile; + +# Delete tuple on publisher +$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE a=1;"); + +if ($simulate_race_condition) +{ + # Wait apply worker to start the search for the tuple using index + $node_subscriber->wait_for_event('logical replication apply worker', + 'index_getnext_slot_before_fetch_apply_dirty'); +} + +# Updater tuple on subscriber +$psql_session_subscriber->query_until( + qr/start/, qq[ + \\echo start + UPDATE conf_tab SET data = 'fromsubnew' WHERE (a=1); +]); + + +if ($simulate_race_condition) +{ + # Wake up apply worker + $node_subscriber->safe_psql('postgres'," + SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty'); + SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty'); + "); +} + +# Tuple was updated - so, we have conflict +$node_subscriber->wait_for_log( + qr/conflict detected on relation \"public.conf_tab\"/, + $log_offset); + +$node_publisher->wait_for_catchup($appname); + +# But tuple should be deleted on subscriber any way +is($node_subscriber->safe_psql('postgres', 'SELECT count(*) from conf_tab'), 0, 'record deleted on subscriber'); + +ok(!$node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=delete_missing/, + $log_offset), 'invalid conflict detected'); + +ok($node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=delete_origin_differs/, + $log_offset), 'correct conflict detected'); + +done_testing(); diff --git a/src/test/subscription/t/038_update_missing_race.pl b/src/test/subscription/t/038_update_missing_race.pl new file mode 100644 index 000000000000..1e120f74bbd3 --- /dev/null +++ b/src/test/subscription/t/038_update_missing_race.pl @@ -0,0 +1,141 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection and resolution in logical replication +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +############################## Set it to 0 to make set success; TODO: delete that for commit +my $simulate_race_condition = 1; +############################## + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_publisher->start; + + +# Create subscriber node with track_commit_timestamp enabled +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->start; + + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text);"); + +# Create similar table on subscriber with additional index to disable HOT updates and additional column +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0); + CREATE INDEX i_index ON conf_tab(i);"); + +# Set up extension to simulate race condition +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE conf_tab"); + +# Insert row to be updated later +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +############################################ +# Race condition because of DirtySnapshot +############################################ + +my $psql_session_subscriber = $node_subscriber->background_psql('postgres'); +if ($simulate_race_condition) +{ + $node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')"); +} + +my $log_offset = -s $node_subscriber->logfile; + +# Update tuple on publisher +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);"); + + +if ($simulate_race_condition) +{ + # Wait apply worker to start the search for the tuple using index + $node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty'); +} + +# Update additional(!) column on the subscriber +$psql_session_subscriber->query_until( + qr/start/, qq[ + \\echo start + UPDATE conf_tab SET i = 1 WHERE (a=1); +]); + + +if ($simulate_race_condition) +{ + # Wake up apply worker + $node_subscriber->safe_psql('postgres'," + SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty'); + SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty'); + "); +} + +# Tuple was updated - so, we have conflict +$node_subscriber->wait_for_log( + qr/conflict detected on relation \"public.conf_tab\"/, + $log_offset); + +$node_publisher->wait_for_catchup($appname); + +# We need new column value be synced with subscriber +is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber'); +# And additional column maintain updated value +is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber'); + +ok(!$node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_missing/, + $log_offset), 'invalid conflict detected'); + +ok($node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/, + $log_offset), 'correct conflict detected'); + +done_testing(); diff --git a/src/test/subscription/t/039_update_missing_with_retain.pl b/src/test/subscription/t/039_update_missing_with_retain.pl new file mode 100644 index 000000000000..7b225d45f7ff --- /dev/null +++ b/src/test/subscription/t/039_update_missing_with_retain.pl @@ -0,0 +1,143 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection and resolution in logical replication +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +############################## Set it to 0 to make set success; TODO: delete that for commit +my $simulate_race_condition = 1; +############################## + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_publisher->start; + + +# Create subscriber node with track_commit_timestamp enabled +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->append_conf('postgresql.conf', + qq(wal_level = 'replica')); +$node_subscriber->start; + + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text);"); + +# Create similar table on subscriber with additional index to disable HOT updates and additional column +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0); + CREATE INDEX i_index ON conf_tab(i);"); + +# Set up extension to simulate race condition +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE conf_tab"); + +# Insert row to be updated later +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub WITH (retain_dead_tuples = true)"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +############################################ +# Race condition because of DirtySnapshot +############################################ + +my $psql_session_subscriber = $node_subscriber->background_psql('postgres'); +if ($simulate_race_condition) +{ + $node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')"); +} + +my $log_offset = -s $node_subscriber->logfile; + +# Update tuple on publisher +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);"); + + +if ($simulate_race_condition) +{ + # Wait apply worker to start the search for the tuple using index + $node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty'); +} + +# Update additional(!) column on the subscriber +$psql_session_subscriber->query_until( + qr/start/, qq[ + \\echo start + UPDATE conf_tab SET i = 1 WHERE (a=1); +]); + + +if ($simulate_race_condition) +{ + # Wake up apply worker + $node_subscriber->safe_psql('postgres'," + SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty'); + SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty'); + "); +} + +# Tuple was updated - so, we have conflict +$node_subscriber->wait_for_log( + qr/conflict detected on relation \"public.conf_tab\"/, + $log_offset); + +$node_publisher->wait_for_catchup($appname); + +# We need new column value be synced with subscriber +is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber'); +# And additional column maintain updated value +is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber'); + +ok(!$node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_deleted/, + $log_offset), 'invalid conflict detected'); + +ok($node_subscriber->log_contains( + qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/, + $log_offset), 'correct conflict detected'); + +done_testing(); diff --git a/src/test/subscription/t/040_update_missing_simulation.pl b/src/test/subscription/t/040_update_missing_simulation.pl new file mode 100644 index 000000000000..21fcd1ceb53c --- /dev/null +++ b/src/test/subscription/t/040_update_missing_simulation.pl @@ -0,0 +1,125 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the conflict detection and resolution in logical replication +# Not intended to be committed because quite heavy +# Here to demonstrate reproducibility with pgbench +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use IPC::Run qw(start finish); +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +############################### +# Setup +############################### + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_publisher->start; + +# Create subscriber node with track_commit_timestamp enabled +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(track_commit_timestamp = on)); +$node_subscriber->start; + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node_subscriber->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create table on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE TABLE tbl(a int PRIMARY key, data_pub int);"); + +# Create similar table on subscriber with additional index to disable HOT updates +$node_subscriber->safe_psql( + 'postgres', + "CREATE TABLE tbl(a int PRIMARY key, data_pub int, data_sub int default 0); + CREATE INDEX data_index ON tbl(data_pub);"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE tbl"); + +# Create the subscription +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub"); + +my $num_rows = 10; +my $num_updates = 10000; +my $num_clients = 10; +$node_publisher->safe_psql('postgres', "INSERT INTO tbl SELECT i, i * i FROM generate_series(1,$num_rows) i"); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# Prepare small pgbench scripts as files +my $sub_sql = $node_subscriber->basedir . '/sub_update.sql'; +my $pub_sql = $node_publisher->basedir . '/pub_delete.sql'; + +open my $fh1, '>', $sub_sql or die $!; +print $fh1 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_sub = data_sub + 1 WHERE a = :num;\n"; +close $fh1; + +open my $fh2, '>', $pub_sql or die $!; +print $fh2 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_pub = data_pub + 1 WHERE a = :num;\n"; +close $fh2; + +my @sub_cmd = ( + 'pgbench', + '--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates", + '-p', $node_subscriber->port, '-h', $node_subscriber->host, '-f', $sub_sql, 'postgres' +); + +my @pub_cmd = ( + 'pgbench', + '--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates", + '-p', $node_publisher->port, '-h', $node_publisher->host, '-f', $pub_sql, 'postgres' +); + +$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); +# This should never happen +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('apply_handle_update_internal_update_missing', 'error')"); +my $log_offset = -s $node_subscriber->logfile; + +# Start both concurrently +my ($sub_out, $sub_err, $pub_out, $pub_err) = ('', '', '', ''); +my $sub_h = start \@sub_cmd, '>', \$sub_out, '2>', \$sub_err; +my $pub_h = start \@pub_cmd, '>', \$pub_out, '2>', \$pub_err; + +# Wait for completion +finish $sub_h; +finish $pub_h; + +like($sub_out, qr/actually processed/, 'subscriber pgbench completed'); +like($pub_out, qr/actually processed/, 'publisher pgbench completed'); + +# Let subscription catch up, then check expectations +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +ok(!$node_subscriber->log_contains( + qr/ERROR: error triggered for injection point apply_handle_update_internal_update_missing/, + $log_offset), 'invalid conflict detected'); + +done_testing();