diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 1b465bc8ba71..2896cd9e4290 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1497,9 +1497,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. - Note that this function is primarily intended for testing and - debugging purposes and should be used with caution. Additionally, - this function cannot be executed if + Note that this function cannot be executed if sync_replication_slots is enabled and the slotsync worker is already running to perform the synchronization of slots. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index b803a819cf1f..b964937d509a 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -405,15 +405,13 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU periodic synchronization of failover slots, they can also be manually synchronized using the pg_sync_replication_slots function on the standby. - However, this function is primarily intended for testing and debugging and - should be used with caution. Unlike automatic synchronization, it does not - include cyclic retries, making it more prone to synchronization failures, - particularly during initial sync scenarios where the required WAL files - or catalog rows for the slot might have already been removed or are at risk - of being removed on the standby. In contrast, automatic synchronization + However, unlike automatic synchronization, it does not perform incremental + updates. It retries cyclically to some extent—continuing until all + the failover slots that existed on primary at the start of the function + call are synchronized. Any slots created after the function begins will + not be synchronized. In contrast, automatic synchronization via sync_replication_slots provides continuous slot updates, enabling seamless failover and supporting high availability. - Therefore, it is the recommended method for synchronizing slots. diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index b122d99b0097..4d43a7eae218 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -39,6 +39,12 @@ * the last cycle. Refer to the comments above wait_for_slot_activity() for * more details. * + * If the pg_sync_replication API is used to sync the slots, and if the slots + * are not ready to be synced and are marked as RS_TEMPORARY because of any of + * the reasons mentioned above, then the API also waits and retries until the + * slots are marked as RS_PERSISTENT (which means sync-ready). Refer to the + * comments in SyncReplicationSlots() for more details. + * * Any standby synchronized slots will be dropped if they no longer need * to be synchronized. See comment atop drop_local_obsolete_slots() for more * details. @@ -64,6 +70,7 @@ #include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/timeout.h" @@ -100,6 +107,16 @@ typedef struct SlotSyncCtxStruct slock_t mutex; } SlotSyncCtxStruct; +/* + * Structure holding parameters that need to be freed on error in + * pg_sync_replication_slots() + */ +typedef struct SlotSyncApiFailureParams +{ + WalReceiverConn *wrconn; + List *slot_names; +} SlotSyncApiFailureParams; + static SlotSyncCtxStruct *SlotSyncCtx = NULL; /* GUC variable */ @@ -147,6 +164,7 @@ typedef struct RemoteSlot static void slotsync_failure_callback(int code, Datum arg); static void update_synced_slots_inactive_since(void); +static void slotsync_api_reread_config(void); /* * If necessary, update the local synced slot's metadata based on the data @@ -553,11 +571,15 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) * local ones, then update the LSNs and persist the local synced slot for * future synchronization; otherwise, do nothing. * + * *slot_persistence_pending is set to true if any of the slots fail to + * persist. It is utilized by the pg_sync_replication_slots() API. + * * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise * false. */ static bool -update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *slot_persistence_pending) { ReplicationSlot *slot = MyReplicationSlot; bool found_consistent_snapshot = false; @@ -576,11 +598,18 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* * The remote slot didn't catch up to locally reserved position. * - * We do not drop the slot because the restart_lsn can be ahead of the - * current location when recreating the slot in the next cycle. It may - * take more time to create such a slot. Therefore, we keep this slot - * and attempt the synchronization in the next cycle. + * We do not drop the slot because the restart_lsn can be + * ahead of the current location when recreating the slot in + * the next cycle. It may take more time to create such a + * slot. Therefore, we keep this slot and attempt the + * synchronization in the next cycle. + * + * We also update the slot_persistence_pending parameter, so + * the API can retry. */ + if (slot_persistence_pending) + *slot_persistence_pending = true; + return false; } @@ -595,6 +624,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.", LSN_FORMAT_ARGS(slot->data.restart_lsn))); + /* Set this, so that API can retry */ + if (slot_persistence_pending) + *slot_persistence_pending = true; + return false; } @@ -618,10 +651,14 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * updated. The slot is then persisted and is considered as sync-ready for * periodic syncs. * + * *slot_persistence_pending is set to true if any of the slots fail to + * persist. It is utilized by the pg_sync_replication_slots() API. + * * Returns TRUE if the local slot is updated. */ static bool -synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *slot_persistence_pending) { ReplicationSlot *slot; XLogRecPtr latestFlushPtr; @@ -715,7 +752,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) if (slot->data.persistency == RS_TEMPORARY) { slot_updated = update_and_persist_local_synced_slot(remote_slot, - remote_dbid); + remote_dbid, + slot_persistence_pending); } /* Slot ready for sync, so sync it. */ @@ -784,7 +822,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - update_and_persist_local_synced_slot(remote_slot, remote_dbid); + update_and_persist_local_synced_slot(remote_slot, remote_dbid, + slot_persistence_pending); slot_updated = true; } @@ -795,15 +834,23 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) } /* - * Synchronize slots. + * Fetch remote slots. * - * Gets the failover logical slots info from the primary server and updates - * the slots locally. Creates the slots if not present on the standby. + * If slot_names is NIL, fetches all failover logical slots from the + * primary server, otherwise fetches only the ones with names in slot_names. + * + * Parameters: + * wrconn - Connection to the primary server + * slot_names - List of slot names (char *) to fetch from primary, + * or NIL to fetch all failover logical slots. + * + * Returns: + * List of remote slot information structures. Returns NIL if no slot + * is found. * - * Returns TRUE if any of the slots gets updated in this sync-cycle. */ -static bool -synchronize_slots(WalReceiverConn *wrconn) +static List * +fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names) { #define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, @@ -812,29 +859,45 @@ synchronize_slots(WalReceiverConn *wrconn) WalRcvExecResult *res; TupleTableSlot *tupslot; List *remote_slot_list = NIL; - bool some_slot_updated = false; - bool started_tx = false; - const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," - " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," - " database, invalidation_reason" - " FROM pg_catalog.pg_replication_slots" - " WHERE failover and NOT temporary"; - - /* The syscache access in walrcv_exec() needs a transaction env. */ - if (!IsTransactionState()) + StringInfoData query; + + initStringInfo(&query); + appendStringInfoString(&query, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase," + " two_phase_at, failover," + " database, invalidation_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + if (slot_names != NIL) { - StartTransactionCommand(); - started_tx = true; + bool first_slot = true; + + /* + * Construct the query to fetch only the specified slots + */ + appendStringInfoString(&query, " AND slot_name IN ("); + + foreach_ptr(char, slot_name, slot_names) + { + if (!first_slot) + appendStringInfoString(&query, ", "); + + appendStringInfo(&query, "'%s'", slot_name); + first_slot = false; + } + appendStringInfoChar(&query, ')'); } /* Execute the query */ - res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow); + res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(query.data); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, errmsg("could not fetch failover logical slots info from the primary server: %s", res->err)); - /* Construct the remote_slot tuple and synchronize each slot locally */ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) { @@ -885,7 +948,6 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated = isnull ? RS_INVAL_NONE : GetSlotInvalidationCause(TextDatumGetCString(d)); - /* Sanity check */ Assert(col == SLOTSYNC_COLUMN_COUNT); /* @@ -905,12 +967,37 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated == RS_INVAL_NONE) pfree(remote_slot); else - /* Create list of remote slots */ remote_slot_list = lappend(remote_slot_list, remote_slot); ExecClearTuple(tupslot); } + walrcv_clear_result(res); + + return remote_slot_list; +} + +/* + * Synchronize slots. + * + * Takes a list of remote slots and synchronizes them locally. Creates the + * slots if not present on the standby and updates existing ones. + * + * Parameters: + * wrconn - Connection to the primary server + * remote_slot_list - List of RemoteSlot structures to synchronize. + * slot_persistence_pending - boolean used by pg_sync_replication_slots + * API to track if any slots could not be + * persisted and need to be retried. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list, + bool *slot_persistence_pending) +{ + bool some_slot_updated = false; + /* Drop local slots that no longer need to be synced. */ drop_local_obsolete_slots(remote_slot_list); @@ -926,19 +1013,12 @@ synchronize_slots(WalReceiverConn *wrconn) */ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); - some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid, + slot_persistence_pending); UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); } - /* We are done, free remote_slot_list elements */ - list_free_deep(remote_slot_list); - - walrcv_clear_result(res); - - if (started_tx) - CommitTransactionCommand(); - return some_slot_updated; } @@ -1186,6 +1266,26 @@ ProcessSlotSyncInterrupts(void) slotsync_reread_config(); } +/* + * Interrupt handler for pg_sync_replication_slots() API. + */ +static void +ProcessSlotSyncAPIInterrupts() +{ + CHECK_FOR_INTERRUPTS(); + + /* If we've been promoted, then no point continuing. */ + if (SlotSyncCtx->stopSignaled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot continue replication slots synchronization" + " as standby promotion is triggered"))); + + /* error out if configuration parameters changed */ + if (ConfigReloadPending) + slotsync_api_reread_config(); +} + /* * Connection cleanup function for slotsync worker. * @@ -1275,7 +1375,7 @@ wait_for_slot_activity(bool some_slot_updated) rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, sleep_ms, - WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN); + WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP); if (rc & WL_LATCH_SET) ResetLatch(MyLatch); @@ -1505,10 +1605,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) for (;;) { bool some_slot_updated = false; + bool started_tx = false; + List *remote_slots; ProcessSlotSyncInterrupts(); - some_slot_updated = synchronize_slots(wrconn); + /* + * The syscache access in fetch_or_refresh_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + remote_slots = fetch_remote_slots(wrconn, NIL); + some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL); + list_free_deep(remote_slots); + + if (started_tx) + CommitTransactionCommand(); wait_for_slot_activity(some_slot_updated); } @@ -1711,7 +1828,8 @@ SlotSyncShmemInit(void) static void slotsync_failure_callback(int code, Datum arg) { - WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg); + SlotSyncApiFailureParams *fparams = + (SlotSyncApiFailureParams *) DatumGetPointer(arg); /* * We need to do slots cleanup here just like WalSndErrorCleanup() does. @@ -1738,23 +1856,169 @@ slotsync_failure_callback(int code, Datum arg) if (syncing_slots) reset_syncing_flag(); - walrcv_disconnect(wrconn); + if (fparams->slot_names) + list_free_deep(fparams->slot_names); + + walrcv_disconnect(fparams->wrconn); +} + +/* + * Helper function to extract slot names from a list of remote slots + */ +static List * +extract_slot_names(List *remote_slots) +{ + List *slot_names = NIL; + MemoryContext oldcontext; + + /* Switch to long-lived TopMemoryContext to store slot names */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + foreach_ptr(RemoteSlot, remote_slot, remote_slots) + { + char *slot_name; + + slot_name = pstrdup(remote_slot->name); + slot_names = lappend(slot_names, slot_name); + } + + MemoryContextSwitchTo(oldcontext); + + return slot_names; +} + +/* + * Re-read the config file and check for critical parameter changes. + * + */ +static void +slotsync_api_reread_config(void) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + + pfree(old_primary_conninfo); + pfree(old_primary_slotname); + + /* throw error for certain parameter changes */ + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(ERROR, + (errcode(ERRCODE_CONFIG_FILE_ERROR), + errmsg("cannot continue slot synchronization due" + " to parameter changes"), + errdetail("One or more of primary_conninfo," + " primary_slot_name or hot_standby_feedback" + " were modified"), + errhint("Retry pg_sync_replication_slots() to use the" + " updated configuration."))); + } } /* * Synchronize the failover enabled replication slots using the specified * primary server connection. + * + * Repeatedly fetches and updates replication slot information from the + * primary until all slots are at least "sync ready". + * Exits early if promotion is triggered or certain critical + * configuration parameters have changed. */ void SyncReplicationSlots(WalReceiverConn *wrconn) { - PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); + SlotSyncApiFailureParams fparams; + + fparams.wrconn = wrconn; + fparams.slot_names = NULL; + + PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(&fparams)); { + List *remote_slots = NIL; + List *slot_names = NIL; /* List of slot names to track */ + check_and_set_sync_info(InvalidPid); validate_remote_info(wrconn); - synchronize_slots(wrconn); + /* Retry until all the slots are sync-ready */ + for (;;) + { + bool started_tx = false; + bool slot_persistence_pending = false; + bool some_slot_updated = false; + + /* Reset flag before every iteration */ + slot_persistence_pending = false; + + /* Check for interrupts and config changes */ + ProcessSlotSyncAPIInterrupts(); + + /* + * The syscache access in fetch_remote_slots() needs a + * transaction env. + */ + if (!IsTransactionState()) { + StartTransactionCommand(); + started_tx = true; + } + + /* + * Fetch remote slot info for the given slot_names. If slot_names is NIL, + * fetch all failover-enabled slots. Note that we reuse slot_names from + * the first iteration; re-fetching all failover slots each time could + * cause an endless loop. Instead of reprocessing only the pending slots + * in each iteration, it's better to process all the slots received in + * the first iteration. This ensures that by the time we're done, all + * slots reflect the latest values. + */ + remote_slots = fetch_remote_slots(wrconn, slot_names); + + /* Attempt to synchronize slots */ + some_slot_updated = synchronize_slots(wrconn, remote_slots, + &slot_persistence_pending); + + /* + * If slot_persistence_pending is true, extract slot names + * for future iterations (only needed if we haven't done it yet) + */ + if (slot_names == NIL && slot_persistence_pending) + { + slot_names = extract_slot_names(remote_slots); + + /* Update the failure structure so that it can be freed on error */ + fparams.slot_names = slot_names; + } + + /* Free the current remote_slots list */ + list_free_deep(remote_slots); + + /* Commit transaction if we started it */ + if (started_tx) + CommitTransactionCommand(); + + /* Done if all slots are persisted i.e are sync-ready */ + if (!slot_persistence_pending) + break; + + /* wait before retrying again */ + wait_for_slot_activity(some_slot_updated); + + } + + if (slot_names) + list_free_deep(slot_names); /* Cleanup the synced temporary slots */ ReplicationSlotCleanup(true); @@ -1762,5 +2026,5 @@ SyncReplicationSlots(WalReceiverConn *wrconn) /* We are done with sync, so reset sync flag */ reset_syncing_flag(); } - PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); + PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(&fparams)); } diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 7553f6eacef7..16b3b04d3c4f 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." -REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." +REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 3059bb8177be..4d42defa156c 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -211,21 +211,82 @@ 'synchronized slot has got its own inactive_since'); ################################################## -# Test that the synchronized slot will be dropped if the corresponding remote -# slot on the primary server has been dropped. +# Test that the synchronized slots will be dropped if the corresponding remote +# slots on the primary server has been dropped. ################################################## $primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');"); +$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub1_slot');"); $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); is( $standby1->safe_psql( 'postgres', - q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';} + q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot');} ), "t", 'synchronized slot has been dropped'); +################################################## +# Test that pg_sync_replication_slots() on the standby waits and retries +# until the slot becomes sync-ready (when the standby catches up to the +# slot's restart_lsn). +################################################## + +# Recreate the slot by creating a subscription on the subscriber, keep it disabled. +$subscriber1->safe_psql('postgres', qq[ + CREATE TABLE push_wal (a int); + CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, enabled = false);]); + +# Create some DDL on the primary so that the slot lags behind the standby +$primary->safe_psql('postgres', "CREATE TABLE push_wal (a int);"); + +# Attempt to synchronize slots using API. This will initially fail because +# the slot is not yet sync-ready (standby hasn't caught up to slot's restart_lsn), +# but the API will wait and retry. Call the API in a background process. +my $log_offset = -s $standby1->logfile; + +my $h = $standby1->background_psql('postgres', on_error_stop => 0); + +$h->query_until(qr//, "SELECT pg_sync_replication_slots();\n"); + +# Confirm that the slot could not be synced initially. +$standby1->wait_for_log( + qr/could not synchronize replication slot \"lsub1_slot\"/, + $log_offset); + +# Enable the Subscription, so that the slot catches up +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); +$subscriber1->wait_for_subscription_sync; + +# Create xl_running_xacts records on the primary for which the standby is waiting +$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + +# Confirm log that the slot has been synced after becoming sync-ready. +$standby1->wait_for_log( + qr/newly created replication slot \"lsub1_slot\" is sync-ready now/, + $log_offset); + +$h->quit; + +# Confirm that the logical failover slot is created on the standby and is +# flagged as 'synced' +is( $standby1->safe_psql( + 'postgres', + q{SELECT count(*) = 1 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot') AND synced AND NOT temporary;} + ), + "t", + 'logical slots are synced after API retry on standby'); + +# Drop the subscription and the tables created, create the slot again so that it can +# be used later. +$subscriber1->safe_psql('postgres',"DROP SUBSCRIPTION regress_mysub1"); +$primary->safe_psql('postgres',"DROP TABLE push_wal"); +$subscriber1->safe_psql('postgres',"DROP TABLE push_wal"); +$primary->psql('postgres', + q{SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);} +); + ################################################## # Test that if the synchronized slot is invalidated while the remote slot is # still valid, the slot will be dropped and re-created on the standby by @@ -281,7 +342,7 @@ # the failover slots. $primary->wait_for_replay_catchup($standby1); -my $log_offset = -s $standby1->logfile; +$log_offset = -s $standby1->logfile; # Synchronize the primary server slots to the standby. $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); @@ -941,10 +1002,11 @@ $subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';"); -# Confirm the synced slot 'lsub1_slot' is retained on the new primary +# Confirm that the synced slots 'lsub1_slot' and 'snap_test_slot' are retained on the new primary is( $standby1->safe_psql( 'postgres', q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'snap_test_slot') AND synced AND NOT temporary;} + ), 't', 'synced slot retained on the new primary');