From 5fbf8f1e28863dfdd8cda1529cd1d8b899655d1f Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 3 Jun 2025 12:52:22 -0700 Subject: [PATCH] Toggle logical decoding dynamically based on logical slot presence. Previously logical decoding required wal_level to be set to 'logical' at server start. This commit adds functionality to automatically control logical decoding availability based on logical replication slot presence. The newly introduced module logicalctl.c allows logical decoding to be dynamically activated when needed when wal_level is set to 'replica'. When the first logical replication slot is created, the system automatically increases the effective WAL level to maintain logical-level WAL records. Conversely, after the last logical slot is dropped or invalidated, it decreases back to 'replica' WAL level. A new read-only GUC parameter effective_wal_level is introduced to monitor the actual WAL level in effect. This parameter reflects the current operational WAL level, which may differ from the configured wal_level setting. While activation occurs synchronously right after creating the first logical slot, deactivation happens asynchronously through the checkpointer process. This design choice exists because deactivation requires waiting for concurrent attempts to update logical decoding status, which can be problematic when the process is holding interrupts. This situation arises when a process cleans up temporary or ephemeral slots on error or at process exit without releasing temporary slots explicitly. This lazy approach has a drawback: it may take longer to change the effective_wal_level and disable logical decoding, especially when the checkpointer is busy with other tasks. However, since dropping the last slot should not happen frequently, we chose this approach in all deactivation cases for simpler code implementation for simplicity, even though the lazy approach is required only in error cases or at process exit time in principle. In the future, we could address this limitation either by using a dedicated worker instead of the checkpointer, or by implementing synchronous waiting during slot drops if workloads are significantly affected by the lazy deactivation of logical decoding. XXX Bump PG_CONTROL_VERSION as it adds a new field to CheckPoint struct. Reviewed-by: Shveta Malik Reviewed-by: Shlok Kyal Reviewed-by: Bertrand Drouvot Reviewed-by: Amit Kapila Reviewed-by: Hayato Kuroda Reviewed-by: Ashutosh Bapat Discussion: https://postgr.es/m/CAD21AoCVLeLYq09pQPaWs+Jwdni5FuJ8v2jgq-u9_uFbcp6UbA@mail.gmail.com --- doc/src/sgml/config.sgml | 43 ++ doc/src/sgml/logical-replication.sgml | 4 +- doc/src/sgml/logicaldecoding.sgml | 46 +- doc/src/sgml/ref/pg_createsubscriber.sgml | 12 +- doc/src/sgml/system-views.sgml | 5 +- src/backend/access/heap/heapam.c | 9 +- src/backend/access/rmgrdesc/xlogdesc.c | 13 +- src/backend/access/transam/xact.c | 24 +- src/backend/access/transam/xlog.c | 94 ++- src/backend/commands/publicationcmds.c | 12 +- src/backend/commands/tablecmds.c | 2 +- src/backend/postmaster/checkpointer.c | 7 + src/backend/postmaster/postmaster.c | 4 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/decode.c | 32 +- src/backend/replication/logical/logical.c | 41 +- src/backend/replication/logical/logicalctl.c | 702 ++++++++++++++++++ src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 9 +- src/backend/replication/slot.c | 170 ++++- src/backend/replication/slotfuncs.c | 8 + src/backend/replication/walsender.c | 8 + src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procsignal.c | 4 + src/backend/storage/ipc/standby.c | 8 +- .../utils/activity/wait_event_names.txt | 3 + src/backend/utils/cache/inval.c | 8 +- src/backend/utils/init/postinit.c | 4 + src/backend/utils/misc/guc_parameters.dat | 9 + src/bin/pg_basebackup/pg_createsubscriber.c | 6 +- .../t/040_pg_createsubscriber.pl | 2 - src/bin/pg_upgrade/check.c | 6 +- src/bin/pg_upgrade/t/002_pg_upgrade.pl | 4 + src/include/access/xlog.h | 16 +- src/include/catalog/pg_control.h | 2 + src/include/replication/logicalctl.h | 31 + src/include/replication/slot.h | 1 + src/include/storage/lwlocklist.h | 1 + src/include/storage/procsignal.h | 2 + src/include/utils/guc_hooks.h | 1 + src/test/recovery/meson.build | 1 + .../t/035_standby_logical_decoding.pl | 5 +- .../recovery/t/050_effective_wal_level.pl | 440 +++++++++++ src/test/regress/expected/publication.out | 4 - src/test/subscription/t/001_rep_changes.pl | 2 +- src/tools/pgindent/typedefs.list | 1 + 46 files changed, 1674 insertions(+), 137 deletions(-) create mode 100644 src/backend/replication/logical/logicalctl.c create mode 100644 src/include/replication/logicalctl.h create mode 100644 src/test/recovery/t/050_effective_wal_level.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 683f7c36f462..a7330f1d63bc 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3040,6 +3040,17 @@ include_dir 'conf.d' many UPDATE and DELETE statements are executed. + + It is important to note that when wal_level is set to + replica, the effective WAL level can automatically change + based on the presence of + logical replication slots. The system automatically increases the + effective WAL level to logical when creating the first + logical replication slot, and decreases it back to replica + when dropping the last logical replication slot. The current effective WAL + level can be monitored through + parameter. + In releases prior to 9.6, this parameter also allowed the values archive and hot_standby. @@ -11823,6 +11834,38 @@ dynamic_library_path = '/usr/local/lib/postgresql:$libdir' + + effective_wal_level (enum) + + effective_wal_level configuration parameter + + + + + Reports the actual WAL logging level currently in effect in the + system. This parameter shares the same set of values as + , but reflects the operational WAL + level rather than the configured setting. For descriptions of + possible values, refer to the wal_level + parameter documentation. + + + The effective WAL level can differ from the configured + wal_level in certain situations. For example, + when wal_level is set to replica + and the system has one or more logical replication slots, + effective_wal_level will show logical + to indicate that the system is maintaining WAL records at + logical level equivalent. + + + On standby servers, effective_wal_level matches + the value of effective_wal_level from the most + upstream server in the replication chain. + + + + huge_pages_status (enum) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index daab2cae9897..3b7e60e9df6e 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2377,7 +2377,7 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER wal_level must be - set to logical. + set to replica or logical. @@ -2498,7 +2498,7 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER The new cluster must have wal_level as - logical. + replica or logical. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index d5a5e22fe2c2..0a134cf50f83 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -47,7 +47,7 @@ Before you can use logical decoding, you must set - to logical and + to replica or higher and to at least 1. Then, you should connect to the target database (in the example below, postgres) as a superuser. @@ -257,6 +257,47 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements. + + + Logical decoding becomes available in two conditions: + + + + + When is set to logical. + + + + + When is set to replica + and at least one valid logical replication slot exists on the system. + + + + + If either condition is met, the operational WAL level becomes equivalent + to logical, which can be monitored through the + parameter. + + + When wal_level is set to replica, + logical decoding is automatically activated upon creation of the first + logical replication slot. This activation process involves several steps + and requires synchronization among processes, ensuring system-wide + consistency. Conversely, if wal_level set to + replica and the last logical replication slot is dropped + or invalidated, logical decoding is automatically disabled. Note that the + deactivation of logical decoding might take some time as it is performed + asynchronously by the checkpointer process. + + + + + When wal_level is set to replica, + dropping or invalidating the last logical slot disables logical decoding + on the primary, resulting in slots on standbys being invalidated. + + @@ -328,8 +369,7 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU that could be needed by the logical decoding on the standby (as it does not know about the catalog_xmin on the standby). Existing logical slots on standby also get invalidated if - wal_level on the primary is reduced to less than - logical. + logical decoding is disabled on the primary. This is done as soon as the standby detects such a change in the WAL stream. It means that, for walsenders that are lagging (if any), some WAL records up to the wal_level parameter change on the primary won't be diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index bb9cc72576c4..5b44710639a1 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -379,12 +379,12 @@ PostgreSQL documentation The source server must accept connections from the target server. The source server must not be in recovery. The source server must have as logical. The source server - must have configured to a value - greater than or equal to the number of specified databases plus existing - replication slots. The source server must have configured to a value greater than or equal - to the number of specified databases and existing WAL sender processes. + linkend="guc-wal-level"/> as replica or logical. + The source server must have + configured to a value greater than or equal to the number of specified + databases plus existing replication slots. The source server must have + configured to a value greater than or + equal to the number of specified databases and existing WAL sender processes. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 7971498fe75a..630372ecb191 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -3062,8 +3062,9 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx wal_level_insufficient means that the - primary doesn't have a sufficient to - perform logical decoding. It is set only for logical slots. + logical decoding is disabled on the primary (See + ). It is set + only for logical slots. diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 36fee9c994e4..8318f8000e4c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8891,8 +8891,9 @@ log_heap_update(Relation reln, Buffer oldbuf, * * Skip this if we're taking a full-page image of the new page, as we * don't include the new tuple in the WAL record in that case. Also - * disable if wal_level='logical', as logical decoding needs to be able to - * read the new tuple in whole from the WAL record alone. + * disable if logical decoding is enabled and the relation requires WAL to + * be logged for logical decoding, as it needs to be able to read the new + * tuple in whole from the WAL record alone. */ if (oldbuf == newbuf && !need_tuple_data && !XLogCheckBufferNeedsBackup(newbuf)) @@ -9064,8 +9065,8 @@ log_heap_update(Relation reln, Buffer oldbuf, /* * Perform XLogInsert of an XLOG_HEAP2_NEW_CID record * - * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog - * tuples. + * This is only used when effective_wal_level is logical, and only for + * catalog tuples. */ static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup) diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index cd6c2a2f650a..11efde539d7e 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -66,7 +66,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) CheckPoint *checkpoint = (CheckPoint *) rec; appendStringInfo(buf, "redo %X/%08X; " - "tli %u; prev tli %u; fpw %s; wal_level %s; xid %u:%u; oid %u; multi %u; offset %u; " + "tli %u; prev tli %u; fpw %s; wal_level %s; logical decoding %s; xid %u:%u; oid %u; multi %u; offset %u; " "oldest xid %u in DB %u; oldest multi %u in DB %u; " "oldest/newest commit timestamp xid: %u/%u; " "oldest running xid %u; %s", @@ -75,6 +75,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) checkpoint->PrevTimeLineID, checkpoint->fullPageWrites ? "true" : "false", get_wal_level_string(checkpoint->wal_level), + checkpoint->logicalDecodingEnabled ? "true" : "false", EpochFromFullTransactionId(checkpoint->nextXid), XidFromFullTransactionId(checkpoint->nextXid), checkpoint->nextOid, @@ -167,6 +168,13 @@ xlog_desc(StringInfo buf, XLogReaderState *record) memcpy(&wal_level, rec, sizeof(int)); appendStringInfo(buf, "wal_level %s", get_wal_level_string(wal_level)); } + else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE) + { + bool enabled; + + memcpy(&enabled, rec, sizeof(bool)); + appendStringInfoString(buf, enabled ? "true" : "false"); + } } const char * @@ -218,6 +226,9 @@ xlog_identify(uint8 info) case XLOG_CHECKPOINT_REDO: id = "CHECKPOINT_REDO"; break; + case XLOG_LOGICAL_DECODING_STATUS_CHANGE: + id = "LOGICAL_DECODING_STATUS_CHANGE"; + break; } return id; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 092e197eba33..d2d3c9b6ed60 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -552,9 +552,9 @@ MarkCurrentTransactionIdLoggedIfAny(void) * operation in a subtransaction. We require that for logical decoding, see * LogicalDecodingProcessRecord. * - * This returns true if wal_level >= logical and we are inside a valid - * subtransaction, for which the assignment was not yet written to any WAL - * record. + * This returns true if effective_wal_level is logical and we are inside + * a valid subtransaction, for which the assignment was not yet written to + * any WAL record. */ bool IsSubxactTopXidLogPending(void) @@ -563,7 +563,7 @@ IsSubxactTopXidLogPending(void) if (CurrentTransactionState->topXidLogged) return false; - /* wal_level has to be logical */ + /* effective_wal_level has to be logical */ if (!XLogLogicalInfoActive()) return false; @@ -682,14 +682,14 @@ AssignTransactionId(TransactionState s) } /* - * When wal_level=logical, guarantee that a subtransaction's xid can only - * be seen in the WAL stream if its toplevel xid has been logged before. - * If necessary we log an xact_assignment record with fewer than - * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set - * for a transaction even though it appears in a WAL record, we just might - * superfluously log something. That can happen when an xid is included - * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in - * xl_standby_locks. + * When effective_wal_level is logical, guarantee that a subtransaction's + * xid can only be seen in the WAL stream if its toplevel xid has been + * logged before. If necessary we log an xact_assignment record with fewer + * than PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't + * set for a transaction even though it appears in a WAL record, we just + * might superfluously log something. That can happen when an xid is + * included somewhere inside a wal record, but not in XLogRecord->xl_xid, + * like in xl_standby_locks. */ if (isSubXact && XLogLogicalInfoActive() && !TopTransactionStateData.didLogXid) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 101b616b028b..e40de3e4362b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -79,7 +79,9 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/origin.h" +#include "replication/logicalctl.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/snapbuild.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -143,6 +145,7 @@ bool XLOG_DEBUG = false; #endif int wal_segment_size = DEFAULT_XLOG_SEG_SIZE; +int effective_wal_level = WAL_LEVEL_REPLICA; /* * Number of WAL insertion locks to use. A higher value allows more insertions @@ -4875,6 +4878,27 @@ show_in_hot_standby(void) return RecoveryInProgress() ? "on" : "off"; } +/* + * GUC show_hook for effective_wal_level + */ +const char * +show_effective_wal_level(void) +{ + if (wal_level == WAL_LEVEL_MINIMAL) + return "minimal"; + + /* + * During recovery, effective_wal_level reflects the primary's + * configuration rather than the local wal_level value. Check the shared + * status instead of the local XLogLogicalInfo because XLogLogicalInfo is + * not updated synchronously during recovery. + */ + if (RecoveryInProgress()) + return IsXLogLogicalInfoEnabled() ? "logical" : "replica"; + + return XLogLogicalInfoActive() ? "logical" : "replica"; +} + /* * Read the control file, set respective GUCs. * @@ -5123,6 +5147,7 @@ BootStrapXLOG(uint32 data_checksum_version) checkPoint.ThisTimeLineID = BootstrapTimeLineID; checkPoint.PrevTimeLineID = BootstrapTimeLineID; checkPoint.fullPageWrites = fullPageWrites; + checkPoint.logicalDecodingEnabled = (wal_level == WAL_LEVEL_LOGICAL); checkPoint.wal_level = wal_level; checkPoint.nextXid = FullTransactionIdFromEpochAndXid(0, FirstNormalTransactionId); @@ -5649,6 +5674,12 @@ StartupXLOG(void) */ StartupReplicationSlots(); + /* + * Startup the logical decoding status with the last status stored in the + * checkpoint record. + */ + StartupLogicalDecodingStatus(checkPoint.logicalDecodingEnabled); + /* * Startup logical state, needs to be setup now so we have proper data * during crash recovery. @@ -6197,6 +6228,18 @@ StartupXLOG(void) */ CompleteCommitTsInitialization(); + /* + * Update logical decoding status in shared memory and write an + * XLOG_LOGICAL_DECODING_STATUS_CHANGE, if necessary. + * + * Note that this function starts to delay logical decoding status changes + * until the recovery state changes to DONE below, which is applied also + * for the checkpointer process in deactivation cases. Therefore, the + * startup should not do any operations that wait for the checkpointer + * because otherwise it easily ends up with a deadlock. + */ + UpdateLogicalDecodingStatusEndOfRecovery(); + /* Clean up EndOfWalRecoveryInfo data to appease Valgrind leak checking */ if (endOfRecoveryInfo->lastPage) pfree(endOfRecoveryInfo->lastPage); @@ -7172,6 +7215,8 @@ CreateCheckPoint(int flags) checkPoint.nextOid += TransamVariables->oidCount; LWLockRelease(OidGenLock); + checkPoint.logicalDecodingEnabled = IsLogicalDecodingEnabled(); + MultiXactGetCheckptMulti(shutdown, &checkPoint.nextMulti, &checkPoint.nextMultiOffset, @@ -8566,21 +8611,6 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); - /* - * Invalidate logical slots if we are in hot standby and the primary - * does not have a WAL level sufficient for logical decoding. No need - * to search for potentially conflicting logically slots if standby is - * running with wal_level lower than logical, because in that case, we - * would have either disallowed creation of logical slots or - * invalidated existing ones. - */ - if (InRecovery && InHotStandby && - xlrec.wal_level < WAL_LEVEL_LOGICAL && - wal_level >= WAL_LEVEL_LOGICAL) - InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, - 0, InvalidOid, - InvalidTransactionId); - LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; @@ -8648,6 +8678,40 @@ xlog_redo(XLogReaderState *record) { /* nothing to do here, just for informational purposes */ } + else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE) + { + bool logical_decoding; + + /* Update the status on shared memory */ + memcpy(&logical_decoding, XLogRecGetData(record), sizeof(bool)); + UpdateLogicalDecodingStatus(logical_decoding, true); + + if (InRecovery && InHotStandby) + { + if (!logical_decoding) + { + /* + * Invalidate logical slots if we are in hot standby and the + * primary disabled logical decoding. + */ + InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, + 0, InvalidOid, + InvalidTransactionId); + } + else if (sync_replication_slots) + { + /* + * Signal the postmaster to launch the slotsync worker. + * + * XXX: For simplicity, we keep the slotsync worker running + * even after logical decoding is disabled. A future + * improvement can consider starting and stopping the worker + * based on logical decoding status change. + */ + kill(PostmasterPid, SIGUSR1); + } + } + } } /* diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 1faf3a8c3728..9f63cd946063 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -37,6 +37,7 @@ #include "parser/parse_clause.h" #include "parser/parse_collate.h" #include "parser/parse_relation.h" +#include "replication/logicalctl.h" #include "rewrite/rewriteHandler.h" #include "storage/lmgr.h" #include "utils/acl.h" @@ -975,11 +976,16 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); - if (wal_level != WAL_LEVEL_LOGICAL) + /* + * We don't need this warning message when wal_level >= 'replica' since + * logical decoding is automatically enabled up on a logical slot + * creation. + */ + if (wal_level < WAL_LEVEL_REPLICA) ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("\"wal_level\" is insufficient to publish logical changes"), - errhint("Set \"wal_level\" to \"logical\" before creating subscriptions."))); + errmsg("logical decoding must be enabled to publish logical changes"), + errhint("Before creating subscriptions, set \"wal_level\" >= \"replica\""))); return myself; } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 3aac459e483d..d6ede8bf5f59 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -2294,7 +2294,7 @@ ExecuteTruncateGuts(List *explicit_rels, xl_heap_truncate xlrec; int i = 0; - /* should only get here if wal_level >= logical */ + /* should only get here if effective_wal_level is 'logical' */ Assert(XLogLogicalInfoActive()); logrelids = palloc(list_length(relids_logged) * sizeof(Oid)); diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index e84e8663e966..152274c77221 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -51,6 +51,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/interrupt.h" #include "replication/syncrep.h" +#include "replication/logicalctl.h" #include "storage/aio_subsys.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" @@ -559,6 +560,12 @@ CheckpointerMain(const void *startup_data, size_t startup_data_len) break; } + /* + * Disable logical decoding if someone requested it. See comments atop + * logicalctl.c. + */ + DisableLogicalDecodingIfNecessary(); + /* Check for archive_timeout and switch xlog files if necessary. */ CheckArchiveTimeout(); diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 7c064cf9fbb2..87a69fc9172e 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -854,9 +854,9 @@ PostmasterMain(int argc, char *argv[]) if (summarize_wal && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL cannot be summarized when \"wal_level\" is \"minimal\""))); - if (sync_replication_slots && wal_level < WAL_LEVEL_LOGICAL) + if (sync_replication_slots && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, - (errmsg("replication slot synchronization (\"sync_replication_slots\" = on) requires \"wal_level\" >= \"logical\""))); + (errmsg("replication slot synchronization (\"sync_replication_slots\" = on) requires \"wal_level\" to be \"replica\" or \"logical\""))); /* * Other one-time internal sanity checks can go here, if they are fast. diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c719af1f8a94..455768a57f0f 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -20,6 +20,7 @@ OBJS = \ decode.o \ launcher.o \ logical.o \ + logicalctl.o \ logicalfuncs.o \ message.o \ origin.o \ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index cc03f0706e9c..ab46a27a2b55 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -150,34 +150,34 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ break; case XLOG_PARAMETER_CHANGE: + + /* + * Even if wal_level on the primary got decreased to 'replica', as + * long as there is at least one valid logical slot, the logical + * decoding remains enabled. So we don't check the logical + * decoding availability here but do so in + * XLOG_LOGICAL_DECODING_STATUS_CHANGE case. It covers the case + * where wal_level on the primary got decreased to 'minimal' too. + */ + break; + case XLOG_LOGICAL_DECODING_STATUS_CHANGE: { - xl_parameter_change *xlrec = - (xl_parameter_change *) XLogRecGetData(buf->record); + bool *logical_decoding = (bool *) XLogRecGetData(buf->record); - /* - * If wal_level on the primary is reduced to less than - * logical, we want to prevent existing logical slots from - * being used. Existing logical slots on the standby get - * invalidated when this WAL record is replayed; and further, - * slot creation fails when wal_level is not sufficient; but - * all these operations are not synchronized, so a logical - * slot may creep in while the wal_level is being reduced. - * Hence this extra check. - */ - if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + if (!(*logical_decoding)) { /* * This can occur only on a standby, as a primary would - * not allow to restart after changing wal_level < logical + * not allow to restart after changing wal_level < replica * if there is pre-existing logical slot. */ Assert(RecoveryInProgress()); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); + errmsg("logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"))); } - break; } + break; case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 866f92cf7996..7a5a86313b64 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -36,6 +36,7 @@ #include "pgstat.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/reorderbuffer.h" #include "replication/slotsync.h" #include "replication/snapbuild.h" @@ -117,31 +118,31 @@ CheckLogicalDecodingRequirements(void) * needs the same check. */ - if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding requires \"wal_level\" >= \"logical\""))); - if (MyDatabaseId == InvalidOid) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - if (RecoveryInProgress()) - { - /* - * This check may have race conditions, but whenever - * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we - * verify that there are no existing logical replication slots. And to - * avoid races around creating a new slot, - * CheckLogicalDecodingRequirements() is called once before creating - * the slot, and once when logical decoding is initially starting up. - */ - if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); - } + /* CheckSlotRequirements() has already checked if wal_level >= 'replica' */ + + /* + * Check if logical decoding is available on standbys. Typically, when + * running a standby, RecoveryInProgress() returning true implies that + * LogicalDecodingStatusChangeAllowed() is false. However, during + * promotion, there is a brief transitional phase where + * RecoveryInProgress() remains true even though + * LogicalDecodingStatusChangeAllowed() has already turned true. + * + * In this window, logical decoding enable/disable operations are + * permitted on standby, anticipating its transition to primary. The + * actual wait for recovery completion is handled within + * start_logical_decoding_status_change(). + */ + if (!IsLogicalDecodingEnabled() && !LogicalDecodingStatusChangeAllowed()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"), + errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\"."))); } /* diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c new file mode 100644 index 000000000000..11da6b29e66f --- /dev/null +++ b/src/backend/replication/logical/logicalctl.c @@ -0,0 +1,702 @@ +/*------------------------------------------------------------------------- + * logicalctl.c + * Functionality to control logical decoding status online. + * + * This module enables dynamic control of logical decoding availability. + * Logical decoding becomes active under two conditions: when the wal_level + * parameter is set to 'logical', or when at least one valid logical replication + * slot exists with wal_level set to 'replica'. The system disables logical + * decoding when neither condition is met. Therefore, the dynamic control + * of logical decoding availability is required only when wal_level is set + * to 'replica'. Logical decoding is always enabled when wal_level='logical' + * and always disabled when wal_level='minimal'. + * + * The core concept of dynamically enabling and disabling logical decoding + * is to separately control two aspects: writing information required for + * logical decoding to WAL records, and using logical decoding itself. During + * activation, we first enable logical WAL writing while keeping logical + * decoding disabled. This change is reflected in the read-only + * effective_wal_level GUC parameter. Once we ensure that all processes have + * updated to the latest effective_wal_level value, we then enable logical + * decoding. Deactivation follows a similar careful, multi-step process + * in reverse order. + * + * While activation occurs synchronously right after creating the first + * logical slot, deactivation happens asynchronously through the checkpointer + * process. This design choice exists because deactivation requires waiting + * for concurrent attempts to update the logical decoding status, which can be + * problematic when the process is holding interrupts. This situation arises + * when a process cleans up temporary or ephemeral slots on error or at process + * exit without releasing temporary slots explicitly. This lazy approach has + * a drawback: it may take longer to change the effective_wal_level and disable + * logical decoding, especially when the checkpointer is busy with other tasks. + * However, since dropping or invalidating the last slot should not happen + * frequently, we chose this approach in all deactivation cases for simpler code + * implementation, even though the lazy approach is required only in error cases + * or at process exit time in principle. In the future, we could address this + * limitation either by using a dedicated worker instead of the checkpointer, or + * by implementing synchronous waiting during slot drops if workloads are + * significantly affected by the lazy deactivation of logical decoding. + * + * Standby servers use the primary server's effective_wal_level and logical + * decoding status. Unlike normal activation and deactivation, these + * are updated simultaneously without status change coordination, solely by + * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level + * setting has no effect during this time. Upon promotion, we update the + * logical decoding status based on local conditions: the wal_level value and + * the presence of logical slots. + * + * In the future, we could extend support to include automatic transitions + * of effective_wal_level between 'minimal' and 'logical' WAL levels. However, + * this enhancement would require additional coordination mechanisms and + * careful implementation of operations such as terminating walsenders and + * archiver processes while carefully considering the sequence of operations + * to ensure system stability during these transitions. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/replication/logical/logicalctl.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xloginsert.h" +#include "catalog/pg_control.h" +#include "miscadmin.h" +#include "storage/condition_variable.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "replication/logicalctl.h" +#include "replication/slot.h" +#include "utils/injection_point.h" +#include "utils/wait_event.h" +#include "utils/wait_event_types.h" + +/* + * Struct for controlling the logical decoding status. + * + * This struct is protected by LogicalDecodingControlLock. + */ +typedef struct LogicalDecodingCtlData +{ + /* + * This is the authoritative value used by all processes to determine + * whether to write additional information required by logical decoding to + * WAL. Since this information could be checked frequently, each process + * caches this value in XLogLogicalInfo for better performance. + */ + bool xlog_logical_info; + + /* True if logical decoding is available in the system */ + bool logical_decoding_enabled; + + /* + * This flag indicates whether logical decoding status changes are + * allowed. It is false during recovery and becomes true when recovery + * ends. Even when true, it specifically means "allowed after recovery has + * fully completed". + * + * This flag helps prevent race conditions with the startup process's + * end-of-recovery actions. After the startup process updates the logical + * decoding status at recovery end, other processes might attempt to + * toggle logical decoding before recovery fully completes (i.e., + * RecoveryInProgress() returns false) - a period when WAL writes are + * still not permitted. Therefore, when this flag is true, we must wait + * for recovery to fully complete before attempting an activation or a + * deactivation. + */ + bool status_change_allowed; + + /* True while the logical decoding status is being changed */ + bool status_change_inprogress; + + /* True if logical decoding might need to be disabled */ + bool pending_disable; + + /* Condition variable signaled when a status change completes */ + ConditionVariable cv; +} LogicalDecodingCtlData; + +static LogicalDecodingCtlData *LogicalDecodingCtl = NULL; + +/* + * A process local cache of LogicalDecodingCtl->xlog_logical_info. This is + * initialized at process startup time, and could be updated when absorbing + * the process barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). + */ +bool XLogLogicalInfo = false; + +static void update_xlog_logical_info(void); +static void abort_logical_decoding_activation(int code, Datum arg); +static bool start_logical_decoding_status_change(bool new_status); + +Size +LogicalDecodingCtlShmemSize(void) +{ + return sizeof(LogicalDecodingCtlData); +} + +void +LogicalDecodingCtlShmemInit(void) +{ + bool found; + + LogicalDecodingCtl = ShmemInitStruct("Logical decoding control", + LogicalDecodingCtlShmemSize(), + &found); + + if (!found) + { + MemSet(LogicalDecodingCtl, 0, LogicalDecodingCtlShmemSize()); + ConditionVariableInit(&LogicalDecodingCtl->cv); + } +} + +/* + * Initialize the logical decoding status in shmem at server startup. This + * must be called ONCE during postmaster or standalone-backend startup. + */ +void +StartupLogicalDecodingStatus(bool last_status) +{ + /* Logical decoding is always disabled when 'minimal' WAL level */ + if (wal_level == WAL_LEVEL_MINIMAL) + return; + + /* + * Set the initial logical decoding status based on the last status. If + * logical decoding was enabled before the last shutdown, it remains + * enabled as we might have set wal_level='logical' or have a few logical + * slots. + */ + UpdateLogicalDecodingStatus(last_status, false); +} + +/* + * Update the XLogLogicalInfo cache. + */ +static inline void +update_xlog_logical_info(void) +{ + XLogLogicalInfo = IsXLogLogicalInfoEnabled(); +} + +/* + * Initialize XLogLogicalInfo backend-private cache. This routine is called + * during process initialization. + */ +void +InitializeProcessXLogLogicalInfo(void) +{ + update_xlog_logical_info(); +} + +/* + * This routine is called when we are ordered to update XLogLogicalInfo + * by a ProcSignalBarrier. + */ +bool +ProcessBarrierUpdateXLogLogicalInfo(void) +{ + update_xlog_logical_info(); + return true; +} + +/* + * Check the shared memory state and return true if logical decoding is + * enabled on the system. + */ +bool +IsLogicalDecodingEnabled(void) +{ + bool enabled; + + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + enabled = LogicalDecodingCtl->logical_decoding_enabled; + LWLockRelease(LogicalDecodingControlLock); + + return enabled; +} + +/* + * Returns true if logical WAL logging is enabled based on the shared memory + * status. + */ +bool +IsXLogLogicalInfoEnabled(void) +{ + bool xlog_logical_info; + + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + xlog_logical_info = LogicalDecodingCtl->xlog_logical_info; + LWLockRelease(LogicalDecodingControlLock); + + return xlog_logical_info; +} + +/* + * Enable or disable both the status of logical info WAL logging and logical + * decoding in shmem. + + * Note that this function updates the global flags without the state transition + * process. EnsureLogicalDecodingEnabled() and DisableLogicalDecodingIfNecessary() + * should be used instead if there could be concurrent processes doing writes + * or logical decoding, particularly once the status change is allowed globally. + */ +void +UpdateLogicalDecodingStatus(bool new_status, bool need_lock) +{ + if (need_lock) + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + /* Must be called before allowing the status change globally */ + Assert(!LogicalDecodingCtl->status_change_allowed); + + LogicalDecodingCtl->xlog_logical_info = new_status; + LogicalDecodingCtl->logical_decoding_enabled = new_status; + + if (need_lock) + LWLockRelease(LogicalDecodingControlLock); + + elog(DEBUG1, "update logical decoding status to %d", new_status); +} + +/* + * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting + * the shared flags to revert the logical decoding activation process. + */ +static void +abort_logical_decoding_activation(int code, Datum arg) +{ + Assert(LogicalDecodingCtl->status_change_inprogress); + + elog(DEBUG1, "aborting logical decoding activation process"); + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->xlog_logical_info = false; + LWLockRelease(LogicalDecodingControlLock); + + /* + * Some processes might have already started logical info WAL logging, so + * tell all running processes to update their caches. We don't need to + * wait for all processes to disable xlog_logical_info locally as it's + * always safe to write logical information to WAL records, even when not + * strictly required. + */ + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->status_change_inprogress = false; + LWLockRelease(LogicalDecodingControlLock); + + /* Let waiters know the status change completed */ + ConditionVariableBroadcast(&LogicalDecodingCtl->cv); +} + +/* + * Returns the status_change_allowed flag in LogicalDecodingCtl. The caller + * might need to check RecoveryInProgress() as well. Please see the comments for + * the status_change_allowed flag for details. + */ +bool +LogicalDecodingStatusChangeAllowed(void) +{ + bool status_change_allowed; + + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + status_change_allowed = LogicalDecodingCtl->status_change_allowed; + LWLockRelease(LogicalDecodingControlLock); + + return status_change_allowed; +} + +/* + * Performs preparation work required before changing the logical decoding + * status. If the status change is required, it sets + * LogicalDecodingCtl->status_change_inprogress, and returns true. Otherwise, + * if it's not required or not allowed (e.g., logical slots exist or during + * recovery) it returns false. + */ +static bool +start_logical_decoding_status_change(bool new_status) +{ + if (!LogicalDecodingStatusChangeAllowed()) + return false; + + if (RecoveryInProgress()) + { + /* + * Wait for the recovery to complete. Note that even the checkpointer + * can wait for the recovery to complete here without concerning + * deadlocks unless the startup process performs any action that waits + * for it after calling UpdateLogicalDecodingStatusEndOfRecovery(). + */ + + elog(DEBUG1, + "waiting for recovery completion to change logical decoding status"); + do + { + CHECK_FOR_INTERRUPTS(); + + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE_DELAY); + pg_usleep(100000L); /* wait for 100 msec */ + pgstat_report_wait_end(); + } + while (RecoveryInProgress()); + + /* + * Now that writing WAL records are officially allowed, start the + * logical decoding status change. + */ + } + +retry: + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + /* + * When attempting to disable logical decoding, if there is at least one + * valid logical slot, we cannot disable it. We need to check it here + * since slots could be created or dropped while waiting for the status + * change below. + */ + if (!new_status && CheckLogicalSlotExists()) + { + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); + return false; + } + + /* If a status change is in-progress, we need to wait for completion */ + if (LogicalDecodingCtl->status_change_inprogress) + { + /* Release the lock and wait for someone to complete the transition */ + LWLockRelease(LogicalDecodingControlLock); + ConditionVariableSleep(&LogicalDecodingCtl->cv, + WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE); + + goto retry; + } + + /* Return if we don't need to change the status */ + if (LogicalDecodingCtl->logical_decoding_enabled == new_status) + { + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); + return false; + } + + /* Mark the state transition is in-progress */ + LogicalDecodingCtl->status_change_inprogress = true; + + LWLockRelease(LogicalDecodingControlLock); + + return true; +} + +/* + * Enable logical decoding if disabled. + * + * If this function is called during recovery, it simply returns without + * action since the logical decoding status change is not allowed during + * this time. The logical decoding status depends on the status on the primary. + * The caller can use CheckLogicalDecodingRequirements() before calling this + * function to make sure that the logical decoding status can be modified. + * + * Note that there is no interlock between logical decoding activation + * and slot creation. To ensure enabling logical decoding, the caller + * needs to call this function after creating a logical slot before + * initializing the logical decoding context. + */ +void +EnsureLogicalDecodingEnabled(void) +{ + Assert(MyReplicationSlot); + + if (wal_level == WAL_LEVEL_MINIMAL) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable logical decoding when \"wal_level\" = \"minimal\"")); + + /* Logical decoding is always enabled */ + if (wal_level >= WAL_LEVEL_LOGICAL) + return; + + /* Prepare and start the activation process if it's disabled */ + if (!start_logical_decoding_status_change(true)) + return; + + /* + * Ensure we reset the activation process if we cancelled or errored out + * below + */ + PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0); + { + /* + * Set logical info WAL logging in shmem. All process starts after + * this point will include the information required by logical + * decoding to WAL records. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->xlog_logical_info = true; + LWLockRelease(LogicalDecodingControlLock); + + /* + * Tell all running processes to reflect the xlog_logical_info update, + * and wait. This ensures that all running processes have enabled + * logical information WAL logging. + */ + WaitForProcSignalBarrier( + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO)); + + INJECTION_POINT("logical-decoding-activation", NULL); + + /* + * There could be some transactions that might have started with the + * old status, but we don't need to wait for these transactions to + * complete. These transactions will appear in the xl_running_xacts + * record and therefore the snapshot builder will not try to decode + * the transaction during the logical decoding initialization. + */ + } + PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0); + + START_CRIT_SECTION(); + + /* + * We enable logical decoding first, followed by writing the WAL record. + * This sequence ensures logical decoding becomes available on the primary + * first. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->logical_decoding_enabled = true; + LWLockRelease(LogicalDecodingControlLock); + + { + XLogRecPtr recptr; + bool logical_decoding = true; + + XLogBeginInsert(); + XLogRegisterData(&logical_decoding, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); + } + + /* Complete the transition */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->status_change_inprogress = false; + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); + + END_CRIT_SECTION(); + + ereport(LOG, + errmsg("logical decoding is enabled upon creating a new logical replication slot")); + + /* Let waiters know the work finished */ + ConditionVariableBroadcast(&LogicalDecodingCtl->cv); +} + +/* + * Initiate a request for disabling logical decoding. + * + * This function does not verify whether logical slots exist. It should be + * called after dropping or invalidating what might be the last logical + * replication slot. + */ +void +RequestDisableLogicalDecoding(void) +{ + volatile PROC_HDR *procglobal = ProcGlobal; + ProcNumber checkpointerProc = procglobal->checkpointerProc; + + if (wal_level != WAL_LEVEL_REPLICA) + return; + + /* + * Check if the status change is allowed before initiating a disable + * request, to avoid unnecessary work. + */ + if (!LogicalDecodingStatusChangeAllowed()) + return; + + /* + * It's possible that we might not actually need to disable logical + * decoding if someone creates a new logical slot concurrently. We set the + * flag anyway and the checkpointer will check it and disable logical + * decoding if necessary. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->pending_disable = true; + LWLockRelease(LogicalDecodingControlLock); + + /* Wake up the checkpointer */ + if (checkpointerProc != INVALID_PROC_NUMBER) + SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch); + + elog(DEBUG1, "requested disabling logical decoding"); +} + +/* + * Disable logical decoding if necessary. + * + * This function disables logical decoding upon a request initiated by + * RequestDisableLogicalDecoding(). Otherwise, it performs no action. + */ +void +DisableLogicalDecodingIfNecessary(void) +{ + bool pending_disable; + + if (wal_level != WAL_LEVEL_REPLICA) + return; + + /* + * Sanity check as we cannot disable logical decoding while holding a + * logical slot. + */ + Assert(!MyReplicationSlot); + + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + pending_disable = LogicalDecodingCtl->pending_disable; + LWLockRelease(LogicalDecodingControlLock); + + /* Quick return if no pending disable request */ + if (!pending_disable) + return; + + /* Prepare and start the deactivation process if it's enabled */ + if (!start_logical_decoding_status_change(false)) + return; + + /* + * We don't need PG_ENSURE_ERROR_CLEANUP() to abort the deactivation + * process as all subsequent operations are expected to be + * non-interruptible and not to throw an ERROR or a FATAL. + */ + + START_CRIT_SECTION(); + + /* + * We need to disable logical decoding first and then disable logical + * information WAL logging in order to ensure that no logical decoding + * processes WAL records with insufficient information. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->logical_decoding_enabled = false; + LWLockRelease(LogicalDecodingControlLock); + + /* Write the WAL to disable logical decoding on standbys too */ + if (XLogStandbyInfoActive()) + { + bool logical_decoding = false; + XLogRecPtr recptr; + + XLogBeginInsert(); + XLogRegisterData(&logical_decoding, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); + } + + /* Now disable logical information WAL logging */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->xlog_logical_info = false; + LWLockRelease(LogicalDecodingControlLock); + + /* + * Tell all running processes to reflect the xlog_logical_info update. + * Unlike when enabling logical decoding, we don't need to wait for all + * processes to complete it in this case. We already disabled logical + * decoding and it's always safe to write logical information to WAL + * records, even when not strictly required. Therefore, we don't need to + * wait for all running transactions to finish either. + */ + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); + + /* Complete the transition */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->status_change_inprogress = false; + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); + + END_CRIT_SECTION(); + + ereport(LOG, + errmsg("logical decoding is disabled because all logical replication slots are removed")); + + /* Let waiters know the work finished */ + ConditionVariableBroadcast(&LogicalDecodingCtl->cv); +} + +/* + * Update the logical decoding status at end of the recovery. This function + * must be called before accepting writes. + */ +void +UpdateLogicalDecodingStatusEndOfRecovery(void) +{ + bool new_status = false; + bool need_wal = false; + + Assert(RecoveryInProgress()); + Assert(!LogicalDecodingCtl->status_change_inprogress); + + /* With 'minimal' WAL level, logical decoding is always disabled */ + if (wal_level == WAL_LEVEL_MINIMAL) + { + Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled()); + return; + } + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists()) + new_status = true; + + if (LogicalDecodingCtl->logical_decoding_enabled != new_status) + need_wal = true; + + /* + * Update shmem flags. We don't need to care about the order of setting + * global flag and writing the WAL record as writes are not allowed yet. + */ + UpdateLogicalDecodingStatus(new_status, false); + + /* + * Mark the end-of-recovery action has been done, allowing processes to + * change the logical decoding status after the recovery finished. + */ + LogicalDecodingCtl->status_change_allowed = true; + + LWLockRelease(LogicalDecodingControlLock); + + if (need_wal) + { + XLogRecPtr recptr; + + Assert(XLogStandbyInfoActive()); + + XLogBeginInsert(); + XLogRegisterData(&new_status, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); + } + + /* + * Ensure all running processes have the updated status. We don't need to + * wait for running transactions to finish as we don't accept any writes + * yet. We need the wait even if we've not updated the status above as the + * status have been turned on and off during recovery, having running + * processes have different status on their local caches. + */ + if (IsUnderPostmaster) + WaitForProcSignalBarrier( + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO)); + + INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL); +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index a2268d8361ee..928b503addf5 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -6,6 +6,7 @@ backend_sources += files( 'decode.c', 'launcher.c', 'logical.c', + 'logicalctl.c', 'logicalfuncs.c', 'message.c', 'origin.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8b4afd87dc91..da9749da4d63 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -56,6 +56,7 @@ #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/snapbuild.h" #include "storage/ipc.h" @@ -1057,13 +1058,15 @@ bool ValidateSlotSyncParams(int elevel) { /* - * Logical slot sync/creation requires wal_level >= logical. + * Logical slot sync/creation requires logical decoding to be enabled. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!IsLogicalDecodingEnabled()) { ereport(elevel, errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\"")); + errmsg("replication slot synchronization requires \"effective_wal_level\" >= \"logical\" on the primary"), + errhint("To enable logical decoding on primary, set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\".")); + return false; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1ec1e997b27d..7e2d933b2a4e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -48,6 +48,7 @@ #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -764,16 +765,15 @@ ReplicationSlotRelease(void) { ReplicationSlot *slot = MyReplicationSlot; char *slotname = NULL; /* keep compiler quiet */ - bool is_logical = false; /* keep compiler quiet */ + bool is_logical; TimestampTz now = 0; Assert(slot != NULL && slot->active_pid != 0); + is_logical = SlotIsLogical(slot); + if (am_walsender) - { slotname = pstrdup(NameStr(slot->data.name)); - is_logical = SlotIsLogical(slot); - } if (slot->data.persistency == RS_EPHEMERAL) { @@ -783,6 +783,14 @@ ReplicationSlotRelease(void) * data. */ ReplicationSlotDropAcquired(); + + /* + * Request to disable logical decoding, even though this slot may not + * have been the last logical slot. The checkpointer will verify if + * logical decoding should actually be disabled. + */ + if (is_logical) + RequestDisableLogicalDecoding(); } /* @@ -852,10 +860,13 @@ void ReplicationSlotCleanup(bool synced_only) { int i; + bool dropped_logical = false; + int n_valid_logicalslots; Assert(MyReplicationSlot == NULL); restart: + n_valid_logicalslots = 0; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -865,6 +876,10 @@ ReplicationSlotCleanup(bool synced_only) continue; SpinLockAcquire(&s->mutex); + + if (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE) + n_valid_logicalslots++; + if ((s->active_pid == MyProcPid && (!synced_only || s->data.synced))) { @@ -872,6 +887,9 @@ ReplicationSlotCleanup(bool synced_only) SpinLockRelease(&s->mutex); LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */ + if (SlotIsLogical(s)) + dropped_logical = true; + ReplicationSlotDropPtr(s); ConditionVariableBroadcast(&s->active_cv); @@ -882,6 +900,9 @@ ReplicationSlotCleanup(bool synced_only) } LWLockRelease(ReplicationSlotControlLock); + + if (dropped_logical && n_valid_logicalslots == 0) + RequestDisableLogicalDecoding(); } /* @@ -890,6 +911,8 @@ ReplicationSlotCleanup(bool synced_only) void ReplicationSlotDrop(const char *name, bool nowait) { + bool is_logical; + Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, nowait, false); @@ -904,7 +927,12 @@ ReplicationSlotDrop(const char *name, bool nowait) errmsg("cannot drop replication slot \"%s\"", name), errdetail("This replication slot is being synchronized from the primary server.")); + is_logical = SlotIsLogical(MyReplicationSlot); + ReplicationSlotDropAcquired(); + + if (is_logical) + RequestDisableLogicalDecoding(); } /* @@ -1440,15 +1468,19 @@ void ReplicationSlotsDropDBSlots(Oid dboid) { int i; + int n_valid_logicalslots; + bool dropped = false; if (max_replication_slots <= 0) return; restart: + n_valid_logicalslots = 0; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s; + bool invalidated; char *slotname; int active_pid; @@ -1462,11 +1494,22 @@ ReplicationSlotsDropDBSlots(Oid dboid) if (!SlotIsLogical(s)) continue; + SpinLockAcquire(&s->mutex); + invalidated = s->data.invalidated == RS_INVAL_NONE; + SpinLockRelease(&s->mutex); + + /* + * Count slots on other databases too so we can disable logical + * decoding only if no slots in the cluster. + */ + if (invalidated) + n_valid_logicalslots++; + /* not our database, skip */ if (s->data.database != dboid) continue; - /* NB: intentionally including invalidated slots */ + /* NB: intentionally including invalidated slots to drop */ /* acquire slot, so ReplicationSlotDropAcquired can be reused */ SpinLockAcquire(&s->mutex); @@ -1518,11 +1561,55 @@ ReplicationSlotsDropDBSlots(Oid dboid) */ LWLockRelease(ReplicationSlotControlLock); ReplicationSlotDropAcquired(); + dropped = true; goto restart; } LWLockRelease(ReplicationSlotControlLock); + + if (dropped && n_valid_logicalslots == 0) + RequestDisableLogicalDecoding(); } +/* + * Returns true if there is at least one in-use valid logical replication slot. + */ +bool +CheckLogicalSlotExists(void) +{ + bool found = false; + + if (max_replication_slots <= 0) + return false; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + bool invalidated; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + if (SlotIsPhysical(s)) + continue; + + SpinLockAcquire(&s->mutex); + invalidated = s->data.invalidated != RS_INVAL_NONE; + SpinLockRelease(&s->mutex); + + if (invalidated) + continue; + + found = true; + break; + } + LWLockRelease(ReplicationSlotControlLock); + + return found; +} /* * Check whether the server's configuration supports using replication @@ -1683,7 +1770,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, break; case RS_INVAL_WAL_LEVEL: - appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server.")); + appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\".")); break; case RS_INVAL_IDLE_TIMEOUT: @@ -1825,10 +1912,11 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, * * Acquires the given slot and mark it invalid, if necessary and possible. * - * Returns whether ReplicationSlotControlLock was released in the interim (and - * in that case we're not holding the lock at return, otherwise we are). + * Returns true if the slot was invalidated. * - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) + * Set *released_lock_out if ReplicationSlotControlLock was released in the + * interim (and in that case we're not holding the lock at return, otherwise + * we are). * * This is inherently racy, because we release the LWLock * for syscalls, so caller must restart if we return true. @@ -1838,10 +1926,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, - bool *invalidated) + bool *released_lock_out) { int last_signaled_pid = 0; bool released_lock = false; + bool invalidated = false; TimestampTz inactive_since = 0; for (;;) @@ -1930,7 +2019,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, } /* Let caller know */ - *invalidated = true; + invalidated = true; } SpinLockRelease(&s->mutex); @@ -2038,7 +2127,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock)); - return released_lock; + *released_lock_out = released_lock; + return invalidated; } /* @@ -2051,7 +2141,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations - * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient + * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not + * logical. * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured * "idle_replication_slot_timeout" duration. * @@ -2068,6 +2159,8 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, { XLogRecPtr oldestLSN; bool invalidated = false; + bool invalidated_logical = false; + int n_valid_logicalslots; Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon)); Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0); @@ -2079,25 +2172,51 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); restart: + n_valid_logicalslots = 0; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (int i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + bool islogical = SlotIsLogical(s); + bool released_lock = false; if (!s->in_use) continue; /* Prevent invalidation of logical slots during binary upgrade */ if (SlotIsLogical(s) && IsBinaryUpgrade) + { + SpinLockAcquire(&s->mutex); + if (s->data.invalidated == RS_INVAL_NONE) + n_valid_logicalslots++; + SpinLockRelease(&s->mutex); + continue; + } - if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid, - snapshotConflictHorizon, - &invalidated)) + if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, + dboid, snapshotConflictHorizon, + &released_lock)) { - /* if the lock was released, start from scratch */ - goto restart; + /* Remember we have invalidated a physical or logical slot */ + invalidated = true; + + /* + * Additionally, remember we have invalidated a logical slot too + * as we can request disabling logical decoding later. + */ + if (islogical) + invalidated_logical = true; } + + SpinLockAcquire(&s->mutex); + if (s->data.invalidated == RS_INVAL_NONE) + n_valid_logicalslots++; + SpinLockRelease(&s->mutex); + + /* if the lock was released, start from scratch */ + if (released_lock) + goto restart; } LWLockRelease(ReplicationSlotControlLock); @@ -2110,6 +2229,15 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, ReplicationSlotsComputeRequiredLSN(); } + /* + * Request the checkpointer to disable logical decoding if no valid + * logical slots remain. If called by the checkpointer during a + * checkpoint, only the request is initiated; actual deactivation is + * deferred until after the checkpoint completes. + */ + if (invalidated_logical && n_valid_logicalslots == 0) + RequestDisableLogicalDecoding(); + return invalidated; } @@ -2639,12 +2767,12 @@ RestoreSlotFromDisk(const char *name) */ if (cp.slotdata.database != InvalidOid) { - if (wal_level < WAL_LEVEL_LOGICAL) + if (wal_level < WAL_LEVEL_REPLICA) ereport(FATAL, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"", + errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"", NameStr(cp.slotdata.name)), - errhint("Change \"wal_level\" to be \"logical\" or higher."))); + errhint("Change \"wal_level\" to be \"replica\" or higher."))); /* * In standby mode, the hot standby must be enabled. This check is diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 0478fc9c9770..9bfe5032ad6b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -18,6 +18,7 @@ #include "access/xlogutils.h" #include "funcapi.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "utils/builtins.h" @@ -136,6 +137,13 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, failover, false); + /* + * Ensure the logical decoding is enabled before initializing the logical + * decoding context. + */ + EnsureLogicalDecodingEnabled(); + Assert(IsLogicalDecodingEnabled()); + /* * Create logical decoding context to find start point or, if we don't * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc8f8559073d..7cb1eb4464b1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -72,6 +72,7 @@ #include "postmaster/interrupt.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/snapbuild.h" @@ -1297,6 +1298,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) need_full_snapshot = true; } + /* + * Ensure the logical decoding is enabled before initializing the + * logical decoding context. + */ + EnsureLogicalDecodingEnabled(); + Assert(IsLogicalDecodingEnabled()); + ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, XL_ROUTINE(.page_read = logical_read_xlog_page, diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index b23d0c19360a..dd44267b43e6 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -32,6 +32,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/walsummarizer.h" +#include "replication/logicalctl.h" #include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/slot.h" @@ -140,6 +141,7 @@ CalculateShmemSize(void) size = add_size(size, SlotSyncShmemSize()); size = add_size(size, AioShmemSize()); size = add_size(size, WaitLSNShmemSize()); + size = add_size(size, LogicalDecodingCtlShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -328,6 +330,7 @@ CreateOrAttachShmemStructs(void) InjectionPointShmemInit(); AioShmemInit(); WaitLSNShmemInit(); + LogicalDecodingCtlShmemInit(); } /* diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 087821311cce..32cd4ebdb777 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -22,6 +22,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "port/pg_bitutils.h" +#include "replication/logicalctl.h" #include "replication/logicalworker.h" #include "replication/walsender.h" #include "storage/condition_variable.h" @@ -576,6 +577,9 @@ ProcessProcSignalBarrier(void) case PROCSIGNAL_BARRIER_SMGRRELEASE: processed = ProcessBarrierSmgrRelease(); break; + case PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO: + processed = ProcessBarrierUpdateXLogLogicalInfo(); + break; } /* diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 4222bdab0780..786cc3e2b0b5 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,6 +24,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/proc.h" @@ -499,7 +500,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * seems OK, given that this kind of conflict should not normally be * reached, e.g. due to using a physical replication slot. */ - if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) + if (IsLogicalDecodingEnabled() && isCatalogRel) InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); } @@ -1285,6 +1286,7 @@ LogStandbySnapshot(void) RunningTransactions running; xl_standby_lock *locks; int nlocks; + bool logical_decoding_enabled = IsLogicalDecodingEnabled(); Assert(XLogStandbyInfoActive()); @@ -1325,13 +1327,13 @@ LogStandbySnapshot(void) * record. Fortunately this routine isn't executed frequently, and it's * only a shared lock. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!logical_decoding_enabled) LWLockRelease(ProcArrayLock); recptr = LogCurrentRunningXacts(running); /* Release lock if we kept it longer ... */ - if (wal_level >= WAL_LEVEL_LOGICAL) + if (logical_decoding_enabled) LWLockRelease(ProcArrayLock); /* GetRunningTransactionData() acquired XidGenLock, we must release it */ diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index c1ac71ff7f24..3b1ad8855563 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -135,6 +135,8 @@ HASH_GROW_BUCKETS_ELECT "Waiting to elect a Parallel Hash participant to allocat HASH_GROW_BUCKETS_REALLOCATE "Waiting for an elected Parallel Hash participant to finish allocating more buckets." HASH_GROW_BUCKETS_REINSERT "Waiting for other Parallel Hash participants to finish inserting tuples into new buckets." LOGICAL_APPLY_SEND_DATA "Waiting for a logical replication leader apply process to send data to a parallel apply process." +LOGICAL_DECODING_STATUS_CHANGE "Waiting for logical decoding status change." +LOGICAL_DECODING_STATUS_CHANGE_DELAY "Waiting for recovery to complete to change logical decoding status." LOGICAL_PARALLEL_APPLY_STATE_CHANGE "Waiting for a logical replication parallel apply process to change state." LOGICAL_SYNC_DATA "Waiting for a logical replication remote server to send data for initial table synchronization." LOGICAL_SYNC_STATE_CHANGE "Waiting for a logical replication remote server to change state." @@ -358,6 +360,7 @@ InjectionPoint "Waiting to read or update information related to injection point SerialControl "Waiting to read or update shared pg_serial state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." WaitLSN "Waiting to read or update shared Wait-for-LSN state." +LogicalDecodingControl "Waiting to access logical decoding status information." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 02505c88b8e4..1d5e8e2d42ac 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -98,9 +98,9 @@ * likewise send the invalidation immediately, before ending the change's * critical section. This includes inplace heap updates, relmap, and smgr. * - * When wal_level=logical, write invalidations into WAL at each command end to - * support the decoding of the in-progress transactions. See - * CommandEndInvalidationMessages. + * When effective_wal_level is 'logical', write invalidations into WAL at + * each command end to support the decoding of the in-progress transactions. + * See CommandEndInvalidationMessages. * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -1419,7 +1419,7 @@ CommandEndInvalidationMessages(void) ProcessInvalidationMessages(&transInvalInfo->ii.CurrentCmdInvalidMsgs, LocalExecuteInvalidationMessage); - /* WAL Log per-command invalidation messages for wal_level=logical */ + /* WAL Log per-command invalidation messages for logical decoding */ if (XLogLogicalInfoActive()) LogLogicalInvalidations(); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 98f9598cd789..fc2d918a594a 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -40,6 +40,7 @@ #include "pgstat.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/walsender.h" @@ -653,6 +654,9 @@ BaseInit(void) /* Initialize lock manager's local structs */ InitLockManagerAccess(); + /* Initialize logical info WAL logging state */ + InitializeProcessXLogLogicalInfo(); + /* * Initialize replication slots after pgstat. The exit hook might need to * drop ephemeral slots, which in turn triggers stats reporting. diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 1128167c0251..e2372173ec1e 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -798,6 +798,15 @@ max => 'MAX_IO_CONCURRENCY', }, +{ name => 'effective_wal_level', type => 'enum', context => 'PGC_INTERNAL', group => 'PRESET_OPTIONS', + short_desc => 'Show effective WAL level.', + flags => 'GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE', + variable => 'effective_wal_level', + boot_val => 'WAL_LEVEL_REPLICA', + options => 'wal_level_options', + show_hook => 'show_effective_wal_level', +}, + { name => 'enable_async_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD', short_desc => 'Enables the planner\'s use of async append plans.', flags => 'GUC_EXPLAIN', diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index df41836e70f0..8624823ac0fb 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -907,7 +907,7 @@ check_publisher(const struct LogicalRepInfo *dbinfo) * Since these parameters are not a requirement for physical replication, * we should check it to make sure it won't fail. * - * - wal_level = logical + * - wal_level >= replica * - max_replication_slots >= current + number of dbs to be converted * - max_wal_senders >= current + number of dbs to be converted * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files) @@ -951,9 +951,9 @@ check_publisher(const struct LogicalRepInfo *dbinfo) disconnect_database(conn, false); - if (strcmp(wal_level, "logical") != 0) + if (strcmp(wal_level, "minimal") == 0) { - pg_log_error("publisher requires \"wal_level\" >= \"logical\""); + pg_log_error("publisher requires \"wal_level\" >= \"replica\""); failed = true; } diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 3d6086dc4893..cbdd3da2dba8 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -240,7 +240,6 @@ sub generate_db # Check some unmet conditions on node P $node_p->append_conf( 'postgresql.conf', q{ -wal_level = replica max_replication_slots = 1 max_wal_senders = 1 max_worker_processes = 2 @@ -265,7 +264,6 @@ sub generate_db # standby settings should not be a lower setting than on the primary. $node_p->append_conf( 'postgresql.conf', q{ -wal_level = logical max_replication_slots = 10 max_wal_senders = 10 max_worker_processes = 8 diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 1e17d64b3ec6..9cdeb15bd51f 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -2131,11 +2131,7 @@ check_new_cluster_replication_slots(void) wal_level = PQgetvalue(res, 0, 0); - if (nslots_on_old > 0 && strcmp(wal_level, "logical") != 0) - pg_fatal("\"wal_level\" must be \"logical\" but is set to \"%s\"", - wal_level); - - if (old_cluster.sub_retain_dead_tuples && + if ((nslots_on_old > 0 || old_cluster.sub_retain_dead_tuples) && strcmp(wal_level, "minimal") == 0) pg_fatal("\"wal_level\" must be \"replica\" or \"logical\" but is set to \"%s\"", wal_level); diff --git a/src/bin/pg_upgrade/t/002_pg_upgrade.pl b/src/bin/pg_upgrade/t/002_pg_upgrade.pl index 823f41e754ce..587b683aec12 100644 --- a/src/bin/pg_upgrade/t/002_pg_upgrade.pl +++ b/src/bin/pg_upgrade/t/002_pg_upgrade.pl @@ -225,6 +225,10 @@ sub get_dump_for_comparison # Override log_statement=all set by Cluster.pm. This avoids large amounts # of log traffic that slow this test down even more when run under valgrind. $oldnode->append_conf('postgresql.conf', 'log_statement = none'); + +# Set wal_level = replica to run the regression tests in the same +# wal_level as when 'make check' runs. +$oldnode->append_conf('postgresql.conf', 'wal_level = replica'); $oldnode->start; my $result; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 605280ed8fb6..a73726982d7d 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -94,6 +94,9 @@ typedef enum RecoveryState } RecoveryState; extern PGDLLIMPORT int wal_level; +extern PGDLLEXPORT int effective_wal_level; + +extern PGDLLEXPORT bool XLogLogicalInfo; /* Is WAL archiving enabled (always or only while server is running normally)? */ #define XLogArchivingActive() \ @@ -122,8 +125,17 @@ extern PGDLLIMPORT int wal_level; /* Do we need to WAL-log information required only for Hot Standby and logical replication? */ #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) -/* Do we need to WAL-log information required only for logical replication? */ -#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) +/* + * Do we need to WAL-log information required only for logical replication? + * + * When XLogLogicalInfo is true, it enables logical-decoding-related WAL logging + * as if wal_level were set to 'logical', even if it's actually set to 'replica'. + * XLogLogicalInfo is a process-local variable, so the value returned by this + * macro might not reflect the latest state, but is sufficient for process-local + * WAL-logging decisions. See comments atop logicalctl.c for details on controlling + * the effective_wal_level. + */ +#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL || XLogLogicalInfo) #ifdef WAL_DEBUG extern PGDLLIMPORT bool XLOG_DEBUG; diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 63e834a6ce47..d62f6188d83e 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -41,6 +41,7 @@ typedef struct CheckPoint * timeline (equals ThisTimeLineID otherwise) */ bool fullPageWrites; /* current full_page_writes */ int wal_level; /* current wal_level */ + bool logicalDecodingEnabled; /* current logical decoding status */ FullTransactionId nextXid; /* next free transaction ID */ Oid nextOid; /* next free OID */ MultiXactId nextMulti; /* next free MultiXactId */ @@ -80,6 +81,7 @@ typedef struct CheckPoint /* 0xC0 is used in Postgres 9.5-11 */ #define XLOG_OVERWRITE_CONTRECORD 0xD0 #define XLOG_CHECKPOINT_REDO 0xE0 +#define XLOG_LOGICAL_DECODING_STATUS_CHANGE 0xF0 /* diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h new file mode 100644 index 000000000000..fe8b7141af7c --- /dev/null +++ b/src/include/replication/logicalctl.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * logicalctl.h + * Definitions for logical decoding status control facility. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/replication/logicalctl.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALCTL_H +#define LOGICALCTL_H + +extern Size LogicalDecodingCtlShmemSize(void); +extern void LogicalDecodingCtlShmemInit(void); +extern void StartupLogicalDecodingStatus(bool status_in_control_file); +extern void InitializeProcessXLogLogicalInfo(void); +extern bool ProcessBarrierUpdateXLogLogicalInfo(void); +extern bool IsLogicalDecodingEnabled(void); +extern bool IsXLogLogicalInfoEnabled(void); +extern void EnsureLogicalDecodingEnabled(void); +extern void RequestDisableLogicalDecoding(void); +extern void DisableLogicalDecodingIfNecessary(void); +extern void UpdateLogicalDecodingStatus(bool new_status, bool need_lock); +extern void UpdateLogicalDecodingStatusEndOfRecovery(void); +extern bool LogicalDecodingStatusChangeAllowed(void); + +#endif diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 09c69f83d579..1270c7894d21 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -329,6 +329,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); +extern bool CheckLogicalSlotExists(void); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 5b0ce383408c..533344509e98 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -86,6 +86,7 @@ PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) PG_LWLOCK(54, WaitLSN) +PG_LWLOCK(55, LogicalDecodingControl) /* * There also exist several built-in LWLock tranches. As with the predefined diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index afeeb1ca019f..8e428f298c66 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -54,6 +54,8 @@ typedef enum typedef enum { PROCSIGNAL_BARRIER_SMGRRELEASE, /* ask smgr to close files */ + PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, /* ask to update + * XLogLogicalInfo */ } ProcSignalBarrierType; /* diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 82ac8646a8d4..fbe0b1e2e3dc 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -61,6 +61,7 @@ extern bool check_default_text_search_config(char **newval, void **extra, GucSou extern void assign_default_text_search_config(const char *newval, void *extra); extern bool check_default_with_oids(bool *newval, void **extra, GucSource source); +extern const char *show_effective_wal_level(void); extern bool check_huge_page_size(int *newval, void **extra, GucSource source); extern void assign_io_method(int newval, void *extra); extern bool check_io_max_concurrency(int *newval, void **extra, GucSource source); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 523a5cd5b527..3486df3c5613 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -58,6 +58,7 @@ tests += { 't/047_checkpoint_physical_slot.pl', 't/048_vacuum_horizon_floor.pl', 't/049_wait_for_lsn.pl', + 't/050_effective_wal_level.pl' ], }, } diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index ebe2fae17898..49d9ea4d0969 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -878,9 +878,10 @@ sub wait_until_vacuum_can_remove $handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr); -# We are not able to read from the slot as it requires wal_level >= logical on the primary server +# We are not able to read from the slot as it requires effective_wal_level >= logical on +# the primary server check_pg_recvlogical_stderr($handle, - "logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary" + "logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary" ); # Restore primary wal_level diff --git a/src/test/recovery/t/050_effective_wal_level.pl b/src/test/recovery/t/050_effective_wal_level.pl new file mode 100644 index 000000000000..9a04b653bdc8 --- /dev/null +++ b/src/test/recovery/t/050_effective_wal_level.pl @@ -0,0 +1,440 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group +# +# Test that effective_wal_level changes upon logical replication slot creation +# and deletion. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Check both wal_level and effective_wal_level values on the given node +# are expected. +sub test_wal_level +{ + my ($node, $expected, $msg) = @_; + + is( $node->safe_psql( + 'postgres', + qq[select current_setting('wal_level'), current_setting('effective_wal_level');] + ), + "$expected", + "$msg"); +} + +# Wait for the checkpointer to decrease effective_wal_level to 'replica'. +sub wait_for_logical_decoding_disabled +{ + my ($node) = @_; + + $node->poll_query_until('postgres', + qq[select current_setting('effective_wal_level') = 'replica';]); +} + +# Initialize the primary server with wal_level = 'replica'. +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 1); +$primary->append_conf('postgresql.conf', "log_min_messages = debug1"); +$primary->start(); + +# Check both initial wal_level and effective_wal_level values. +test_wal_level($primary, "replica|replica", + "wal_level and effective_wal_level start with the same value 'replica'"); + +# Create a physical slot. +$primary->safe_psql('postgres', + qq[select pg_create_physical_replication_slot('test_phy_slot', false, false)] +); + +# Physical slots don't affect effective_wal_level. +test_wal_level($primary, "replica|replica", + "effective_wal_level doesn't change with a new physical slot"); +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_phy_slot')]); + +# Create a temporary logical slot but exits without releasing it explicitly. +# This enables logical decoding but skips disabling it and delegates to the +# checkpointer. +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_tmp_slot', 'test_decoding', true)] +); + +# Wait for the checkpointer to disable logical decoding. +wait_for_logical_decoding_disabled($primary); + +# Create a new logical slot and check if effective_wal_level must be increased +# to 'logical'. +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); +test_wal_level($primary, "replica|logical", + "effective_wal_level increased to logical upon a logical slot creation"); + +# Restart the server and check again. +$primary->restart(); +test_wal_level($primary, "replica|logical", + "effective_wal_level remains logical even after a server restart"); + +# Create and drop another logical slot, then check if effective_wal_level remains +# 'logical'. +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot2', 'pgoutput')]); +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot2')]); +test_wal_level($primary, "replica|logical", + "effective_wal_level stays 'logical' as one slot remains"); + +# Check if the server cannot start with wal_level='minimal' as long as there is +# at least one replication slot. +$primary->adjust_conf('postgresql.conf', 'wal_level', 'minimal'); +$primary->adjust_conf('postgresql.conf', 'max_wal_senders', '0'); +$primary->stop; + +command_fails( + [ + 'pg_ctl', + '--pgdata' => $primary->data_dir, + '--log' => $primary->logfile, + 'start', + ], + "cannot server with wal_level='minimal' as there is in-use logical slot"); + +my $logfile = slurp_file($primary->logfile()); +like( + $logfile, + qr/logical replication slot "test_slot" exists, but "wal_level" < "replica"/, + 'logical slots requires logical decoding enabled at server startup'); + +# Revert the modified settings. +$primary->adjust_conf('postgresql.conf', 'wal_level', 'replica'); +$primary->adjust_conf('postgresql.conf', 'max_wal_senders', '10'); + +# Add other settings to test if we disable logical decoding when invalidating the last +# logical slot. +$primary->append_conf( + 'postgresql.conf', + qq[ +min_wal_size = 32MB +max_wal_size = 32MB +max_slot_wal_keep_size = 16MB +]); +$primary->start; + +# Advance WAL and check if the slot gets invalidated. +$primary->advance_wal(2); +$primary->safe_psql('postgres', qq[CHECKPOINT]); +is( $primary->safe_psql( + 'postgres', + qq[ +select invalidation_reason = 'wal_removed' from pg_replication_slots where slot_name = 'test_slot'; + ]), + 't', + 'test_slot gets invalidated due to wal_removed'); + +# Check if logical decoding is disabled after invalidating the last logical slot. +wait_for_logical_decoding_disabled($primary); +test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' after invalidating the last logical slot" +); + +# Revert the modified settings, and restart the server. +$primary->adjust_conf('postgresql.conf', 'max_slot_wal_keep_size', undef); +$primary->adjust_conf('postgresql.conf', 'min_wal_size', undef); +$primary->adjust_conf('postgresql.conf', 'max_wal_size', undef); +$primary->restart; + +# Recreate the logical slot to enable logical decoding again. +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); + +# Take backup during the effective_wal_level being 'logical'. But note that +# replication slots are not included in the backup. +$primary->backup('my_backup'); + +# Initialize standby1 node. +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup($primary, 'my_backup', has_streaming => 1); +$standby1->start; + +# Check if the standby's effective_wal_level is 'logical' in spite of +# wal_level being 'replica'. +test_wal_level($standby1, "replica|logical", + "effective_wal_level is 'logical' on standby in spite of wal_level is 'replica'" +); + +# Promote the standby1 node that doesn't have any logical slot. So +# effective_wal_level should be decreased to 'replica' at promotion. +$standby1->promote; +test_wal_level($standby1, "replica|replica", + "effective_wal_level got decreased to 'replica' during promotion"); +$standby1->stop; + +# Initialize standby2 node. +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($primary, 'my_backup', has_streaming => 1); +$standby2->start; + +# Creating a logical slot on standby should succeed as the primary enables +# it. +$primary->wait_for_replay_catchup($standby2); +$standby2->create_logical_slot_on_standby($primary, 'standby2_slot', + 'postgres'); + +# Promote the standby2 node that has one logical slot. So effective_wal_level +# remains 'logical' even after the promotion. +$standby2->promote; +test_wal_level($standby2, "replica|logical", + "effective_wal_level remains 'logical' even after the promotion"); + +# Confirm if we can create a logical slot after the promotion. +$standby2->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('standby2_slot2', 'pgoutput')] +); +$standby2->stop; + +# Initialize standby3 node and start it with wal_level = 'logical'. +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +$standby3->init_from_backup($primary, 'my_backup', has_streaming => 1); +$standby3->append_conf('postgresql.conf', qq[wal_level = 'logical']); +$standby3->start(); +$standby3->backup('my_backup3'); + +# Initialize cascade standby and start with wal_level = 'replica'. +my $cascade = PostgreSQL::Test::Cluster->new('cascade'); +$cascade->init_from_backup($standby3, 'my_backup3', has_streaming => 1); +$cascade->adjust_conf('postgresql.conf', 'wal_level', 'replica'); +$cascade->start(); + +# Regardless of their wal_level values, effective_wal_level values on the +# standby and the cascaded standby depend on the primary's value, 'logical'. +test_wal_level($standby3, "logical|logical", + "check wal_level and effective_wal_level on standby"); +test_wal_level($cascade, "replica|logical", + "check wal_level and effective_wal_level on cascaded standby"); + +# Drop the primary's last logical slot, decreasing effective_wal_level to +# replica on all nodes. +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); +wait_for_logical_decoding_disabled($primary); + +$primary->wait_for_replay_catchup($standby3); +$standby3->wait_for_replay_catchup($cascade, $primary); + +test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' on primary"); +test_wal_level($standby3, "logical|replica", + "effective_wal_level got decreased to 'replica' on standby"); +test_wal_level($cascade, "replica|replica", + "effective_wal_level got decreased to 'replica' on cascaded standby"); + +# Promote standby3, increasing effective_wal_level to 'logical' as its wal_level +# is set to 'logical'. +$standby3->promote; + +# Check if effective_wal_level is increased to 'logical' on the cascaded standby. +$standby3->wait_for_replay_catchup($cascade); +test_wal_level($cascade, "replica|logical", + "effective_wal_level got increased to 'logical' on standby as the new primary has wal_level='logical'" +); + +$standby3->stop; +$cascade->stop; + +# Initialize standby4 node and start it. +my $standby4 = PostgreSQL::Test::Cluster->new('standby4'); +$standby4->init_from_backup($primary, 'my_backup', has_streaming => 1); +$standby4->start; + +# Create logical slots on both nodes. +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); +$primary->wait_for_replay_catchup($standby4); +$standby4->create_logical_slot_on_standby($primary, 'standby4_slot', + 'postgres'); + +# Drop the logical slot from the primary, decreasing effective_wal_level to +# 'replica' on the primary, which leads to invalidating the logical slot on the +# standby due to 'wal_level_insufficient'. +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); +wait_for_logical_decoding_disabled($primary); +test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' on the primary to invalidate standby's slots" +); +$standby4->poll_query_until( + 'postgres', qq[ +select invalidation_reason = 'wal_level_insufficient' from pg_replication_slots where slot_name = 'standby4_slot' + ]); + +# Restart the server to check if the slot is successfully restored during +# startup. +$standby4->restart; + +# Check that the logical decoding is not enabled on the standby4. Note that it still has +# the invalidated logical slot. +test_wal_level($standby4, "replica|replica", + "effective_wal_level got decreased to 'replica' on standby"); + +my ($result, $stdout, $stderr) = $standby4->psql('postgres', + qq[select pg_logical_slot_get_changes('standby4_slot', null, null)]); +like( + $stderr, + qr/ERROR: logical decoding on standby requires "effective_wal_level" >= "logical" on the primary/, + "cannot use logical decoding on standby as it is disabled on primary"); + +# Restart the primary with setting wal_level = 'logical' and create a new logical +# slot. +$primary->append_conf('postgresql.conf', qq[wal_level = 'logical']); +$primary->restart; +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); + +# effective_wal_level should be 'logical' on both nodes. +$primary->wait_for_replay_catchup($standby4); +test_wal_level($primary, "logical|logical", + "check WAL levels on the primary node"); +test_wal_level($standby4, "replica|logical", + "effective_wal_level got increased to 'logical' again on standby"); + +# Set wal_level to 'replica' and restart the primary. Since one logical slot +# is still present on the primary, effective_wal_level remains 'logical' even +# if wal_level got decreased to 'replica'. +$primary->adjust_conf('postgresql.conf', 'wal_level', 'replica'); +$primary->restart; +$primary->wait_for_replay_catchup($standby4); + +# Check if the effective_wal_level remains 'logical' on the both nodes +test_wal_level($primary, "replica|logical", + "effective_wal_level remains 'logical' on primary even after setting wal_level to 'replica'" +); +test_wal_level($standby4, "replica|logical", + "effective_wal_level remains 'logical' on standby even after setting wal_level to 'replica' on primary" +); + +# Promote the standby4 and check if effective_wal_level got decreased to +# 'replica' after the promotion since there is no valid logical slot. +$standby4->promote; +test_wal_level($standby4, "replica|replica", + "effective_wal_level got decreased to 'replica' as there is no valid logical slot" +); + +# Cleanup the invalidated slot. +$standby4->safe_psql('postgres', + qq[select pg_drop_replication_slot('standby4_slot')]); + +$standby4->stop; + +# Test the race condition at end of the recovery between the startup and logical +# decoding status change. This test requires injection points enabled. +if ( $ENV{enable_injection_points} eq 'yes' + && $primary->check_extension('injection_points')) +{ + # Initialize standby5 and start it. + my $standby5 = PostgreSQL::Test::Cluster->new('standby5'); + $standby5->init_from_backup($primary, 'my_backup', has_streaming => 1); + $standby5->start; + + # Both servers have one logical slot. + $primary->wait_for_replay_catchup($standby5); + $standby5->create_logical_slot_on_standby($primary, 'standby5_slot', + 'postgres'); + + # Enable and attach the injection point on the standby5. + $primary->safe_psql('postgres', 'create extension injection_points'); + $primary->wait_for_replay_catchup($standby5); + $standby5->safe_psql('postgres', + qq[select injection_points_attach('startup-logical-decoding-status-change-end-of-recovery', 'wait');] + ); + + # Trigger promotion with no wait, and wait for the startup process to reach + # the injection point. + $standby5->safe_psql('postgres', qq[select pg_promote(false)]); + note('promote the standby and waiting for injection_point'); + $standby5->wait_for_event('startup', + 'startup-logical-decoding-status-change-end-of-recovery'); + note( + "injection_point 'startup-logical-decoding-status-change-end-of-recovery' is reached" + ); + + # Drop the logical slot, requesting to disable logical decoding to the checkpointer. + # It has to wait for the recovery to complete before disabling logical decoding. + $standby5->safe_psql('postgres', + qq[select pg_drop_replication_slot('standby5_slot');]); + + $standby5->wait_for_log( + "waiting for recovery completion to change logical decoding status"); + + # Resume the startup process to complete the recovery. + $standby5->safe_psql('postgres', + qq[select injection_points_wakeup('startup-logical-decoding-status-change-end-of-recovery')] + ); + + # Check if logical decoding got disabled after the recovery. + wait_for_logical_decoding_disabled($standby5); + test_wal_level($standby5, "replica|replica", + "effective_wal_level properly got decreased to 'replica'"); + $standby5->stop; + + # Test the abort process of logical decoding activation. We drop the primary's + # slot to decrease its effective_wal_level to 'replica'. + $primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); + wait_for_logical_decoding_disabled($primary); + test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' on primary"); + + # Start two psql sessions to test the case where they try to enable logical + # decoding concurrently. + my $psql_create_slot_1 = $primary->background_psql('postgres'); + my $psql_create_slot_2 = $primary->background_psql('postgres'); + + # Start the logical decoding activation process upon creating the logical + # slot, but it will wait due to the injection point. + $psql_create_slot_1->query_until( + qr/create_slot_canceled/, + q(\echo create_slot_canceled +select injection_points_set_local(); +select injection_points_attach('logical-decoding-activation', 'wait'); +select pg_create_logical_replication_slot('slot_canceled', 'pgoutput'); +\q +)); + + $primary->wait_for_event('client backend', 'logical-decoding-activation'); + note("injection_point 'logical-decoding-activation' is reached"); + + # Start another activation process but it needs to wait for the first + # activation process to complete. + $psql_create_slot_2->query_until( + qr/create_slot_success/, + q(\echo create_slot_success +select pg_create_logical_replication_slot('test_slot', 'pgoutput'); +\q +)); + $primary->wait_for_event('client backend', 'LogicalDecodingStatusChange'); + + # Cancel the backend initiated by $psql_create_slot_1, aborting its activation + # process, letting the second activation process proceed. + $primary->safe_psql( + 'postgres', + qq[ +select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled' and pid <> pg_backend_pid() +]); + + # Check if the backend aborted the activation process. + $primary->wait_for_log("aborting logical decoding activation process"); + + # Wait for the logical slot 'test_slot' to be created. + $primary->poll_query_until('postgres', + qq[select exists (select 1 from pg_replication_slots where slot_name = 'test_slot')] + ); + + test_wal_level($primary, "replica|logical", + "effective_wal_level increased to 'logical'"); +} + +$primary->stop; + +done_testing(); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index e72d1308967e..7f81e61d7a75 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -288,12 +288,8 @@ CREATE PUBLICATION regress_pub_for_allsequences_alltables FOR ALL SEQUENCES, ALL SET client_min_messages = 'NOTICE'; CREATE PUBLICATION regress_pub_for_allsequences_alltables_withclause FOR ALL SEQUENCES, ALL TABLES WITH (publish = 'insert'); NOTICE: publication parameters are not applicable to sequence synchronization and will be ignored for sequences -WARNING: "wal_level" is insufficient to publish logical changes -HINT: Set "wal_level" to "logical" before creating subscriptions. CREATE PUBLICATION regress_pub_for_allsequences_withclause FOR ALL SEQUENCES WITH (publish_generated_columns = 'stored'); NOTICE: publication parameters are not applicable to sequence synchronization and will be ignored for sequences -WARNING: "wal_level" is insufficient to publish logical changes -HINT: Set "wal_level" to "logical" before creating subscriptions. RESET client_min_messages; SELECT pubname, puballtables, puballsequences FROM pg_publication WHERE pubname = 'regress_pub_for_allsequences_alltables'; pubname | puballtables | puballsequences diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 430c1246d14c..ecb79e794740 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -595,7 +595,7 @@ }); like( $reterr, - qr/WARNING: "wal_level" is insufficient to publish logical changes/, + qr/WARNING: logical decoding must be enabled to publish logical changes/, 'CREATE PUBLICATION while "wal_level=minimal"'); done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 432509277c98..8ef01ba8ee9c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1614,6 +1614,7 @@ LogicalDecodeStreamStopCB LogicalDecodeStreamTruncateCB LogicalDecodeTruncateCB LogicalDecodingContext +LogicalDecodingCtlData LogicalErrorCallbackState LogicalOutputPluginInit LogicalOutputPluginWriterPrepareWrite