From 60f0e5ec662106133434aa7ec0e68971bac98a31 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 27 Oct 2025 15:31:13 +0530 Subject: [PATCH 1/3] Add sequence synchronization for logical replication. This patch introduces sequence synchronization. Sequences that are synced will have 2 states: - INIT (needs [re]synchronizing) - READY (is already synchronized) A new sequencesync worker is launched as needed to synchronize sequences. A single sequencesync worker is responsible for synchronizing all sequences. It begins by retrieving the list of sequences that are flagged for synchronization, i.e., those in the INIT state. These sequences are then processed in batches, allowing multiple entries to be synchronized within a single transaction. The worker fetches the current sequence values and page LSNs from the remote publisher, updates the corresponding sequences on the local subscriber, and finally marks each sequence as READY upon successful synchronization. Sequence synchronization occurs in 3 places: 1) CREATE SUBSCRIPTION - The command syntax remains unchanged. - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize all sequences. 2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - The command syntax remains unchanged. - Dropped published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize only newly added sequences. 3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES - A new command introduced for PG19 by f0b3573c3a. - All sequences in pg_subscription_rel are reset to INIT state. - Initiate the sequencesync worker to synchronize all sequences. - Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command, addition and removal of missing sequences will not be done in this case. Author: Vignesh C Reviewed-by: Amit Kapila Reviewed-by: shveta malik Reviewed-by: Hou Zhijie Reviewed-by: Masahiko Sawada Reviewed-by: Hayato Kuroda Reviewed-by: Dilip Kumar Reviewed-by: Peter Smith Reviewed-by: Nisha Moond Reviewed-by: Shlok Kyal Discussion: https://www.postgresql.org/message-id/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com --- src/backend/catalog/pg_subscription.c | 2 +- src/backend/commands/sequence.c | 18 +- src/backend/postmaster/bgworker.c | 5 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/launcher.c | 56 +- src/backend/replication/logical/meson.build | 1 + .../replication/logical/sequencesync.c | 730 ++++++++++++++++++ src/backend/replication/logical/syncutils.c | 140 +++- src/backend/replication/logical/tablesync.c | 77 +- src/backend/replication/logical/worker.c | 66 +- src/backend/utils/misc/guc_parameters.dat | 2 +- src/include/catalog/pg_proc.dat | 2 +- src/include/catalog/pg_subscription_rel.h | 23 + src/include/commands/sequence.h | 1 + src/include/replication/logicalworker.h | 3 +- src/include/replication/worker_internal.h | 22 +- src/test/subscription/t/036_sequences.pl | 175 ++++- src/tools/pgindent/typedefs.list | 2 + 18 files changed, 1209 insertions(+), 117 deletions(-) create mode 100644 src/backend/replication/logical/sequencesync.c diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 15b233a37d8d..1945627ed88e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -354,7 +354,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); if (!HeapTupleIsValid(tup)) - elog(ERROR, "subscription table %u in subscription %u does not exist", + elog(ERROR, "subscription relation %u in subscription %u does not exist", relid, subid); /* Update the tuple. */ diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index c23dee5231c0..8d671b7a29d6 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -112,7 +112,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity, bool *is_called, bool *need_seq_rewrite, List **owned_by); -static void do_setval(Oid relid, int64 next, bool iscalled); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); @@ -954,8 +953,8 @@ lastval(PG_FUNCTION_ARGS) * it is the only way to clear the is_called flag in an existing * sequence. */ -static void -do_setval(Oid relid, int64 next, bool iscalled) +void +SetSequence(Oid relid, int64 next, bool iscalled) { SeqTable elm; Relation seqrel; @@ -1056,7 +1055,7 @@ do_setval(Oid relid, int64 next, bool iscalled) /* * Implement the 2 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval_oid(PG_FUNCTION_ARGS) @@ -1064,14 +1063,14 @@ setval_oid(PG_FUNCTION_ARGS) Oid relid = PG_GETARG_OID(0); int64 next = PG_GETARG_INT64(1); - do_setval(relid, next, true); + SetSequence(relid, next, true); PG_RETURN_INT64(next); } /* * Implement the 3 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval3_oid(PG_FUNCTION_ARGS) @@ -1080,7 +1079,7 @@ setval3_oid(PG_FUNCTION_ARGS) int64 next = PG_GETARG_INT64(1); bool iscalled = PG_GETARG_BOOL(2); - do_setval(relid, next, iscalled); + SetSequence(relid, next, iscalled); PG_RETURN_INT64(next); } @@ -1797,8 +1796,9 @@ pg_sequence_parameters(PG_FUNCTION_ARGS) /* * Return the sequence tuple along with its page LSN. * - * This is primarily intended for use by pg_dump to gather sequence data - * without needing to individually query each sequence relation. + * This is primarily used by pg_dump to efficiently collect sequence data + * without querying each sequence individually, and is also leveraged by + * logical replication while synchronizing sequences. */ Datum pg_get_sequence_data(PG_FUNCTION_ARGS) diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 1ad65c237c34..142a02eb5e95 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,7 +131,10 @@ static const struct "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, { - "TablesyncWorkerMain", TablesyncWorkerMain + "TableSyncWorkerMain", TableSyncWorkerMain + }, + { + "SequenceSyncWorkerMain", SequenceSyncWorkerMain } }; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c62c8c67521c..c719af1f8a94 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -26,6 +26,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + sequencesync.o \ slotsync.o \ snapbuild.o \ syncutils.o \ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 95b5cae9a552..bb49ad17dc7e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -248,9 +248,10 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * Walks the workers array and searches for one that matches given worker type, * subscription id, and relation id. * - * For apply workers, the relid should be set to InvalidOid, as they manage - * changes across all tables. For table sync workers, the relid should be set - * to the OID of the relation being synchronized. + * For apply workers and sequencesync workers, the relid should be set to + * InvalidOid, as they manage changes across all tables. For tablesync + * workers, the relid should be set to the OID of the relation being + * synchronized. */ LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, @@ -334,6 +335,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -422,7 +424,8 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -478,6 +481,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, TIMESTAMP_NOBEGIN(worker->last_recv_time); worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); + worker->last_seqsync_start_time = 0; /* Before releasing lock, remember generation for future identification. */ generation = worker->generation; @@ -511,8 +515,16 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -848,6 +860,33 @@ logicalrep_launcher_onexit(int code, Datum arg) LogicalRepCtx->launcher_pid = 0; } +/* + * Reset the last_seqsync_start_time of the sequencesync worker in the + * subscription's apply worker. + * + * Note that this value is not stored in the sequencesync worker, because that + * has finished already and is about to exit. + */ +void +logicalrep_reset_seqsync_start_time(void) +{ + LogicalRepWorker *worker; + + /* + * The apply worker can't access last_seqsync_start_time concurrently, so + * it is okay to use SHARED lock here. See ProcessSequencesForSync(). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid, + true); + if (worker) + worker->last_seqsync_start_time = 0; + + LWLockRelease(LogicalRepWorkerLock); +} + /* * Cleanup function. * @@ -896,7 +935,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w))) res++; } @@ -1610,7 +1649,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; @@ -1650,6 +1689,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_PARALLEL_APPLY: values[9] = CStringGetTextDatum("parallel apply"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 9283e996ef4a..a2268d8361ee 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -12,6 +12,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'sequencesync.c', 'slotsync.c', 'snapbuild.c', 'syncutils.c', diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 000000000000..623311b55ab6 --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,730 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: sequence synchronization + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + * Sequences requiring synchronization are tracked in the pg_subscription_rel + * catalog. + * + * Sequences to be synchronized will be added with state INIT when either of + * the following commands is executed: + * CREATE SUBSCRIPTION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * + * Executing the following command resets all sequences in the subscription to + * state INIT, triggering re-synchronization: + * ALTER SUBSCRIPTION ... REFRESH SEQUENCES + * + * The apply worker periodically scans pg_subscription_rel for sequences in + * INIT state. When such sequences are found, it spawns a sequencesync worker + * to handle synchronization. + * + * A single sequencesync worker is responsible for synchronizing all sequences. + * It begins by retrieving the list of sequences that are flagged for + * synchronization, i.e., those in the INIT state. These sequences are then + * processed in batches, allowing multiple entries to be synchronized within a + * single transaction. The worker fetches the current sequence values and page + * LSNs from the remote publisher, updates the corresponding sequences on the + * local subscriber, and finally marks each sequence as READY upon successful + * synchronization. + * + * Sequence state transitions follow this pattern: + * INIT → READY + * + * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH + * sequences are synchronized per transaction. The locks on the sequence + * relation will be periodically released at each transaction commit. + * + * XXX: We didn't choose launcher process to maintain the launch of sequencesync + * worker as it didn't have database connection to access the sequences from the + * pg_subscription_rel system catalog that need to be synchronized. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_sequence.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "pgstat.h" +#include "postmaster/interrupt.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/fmgroids.h" +#include "utils/guc.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" +#include "utils/syscache.h" +#include "utils/usercontext.h" + +#define REMOTE_SEQ_COL_COUNT 10 + +typedef enum CopySeqResult +{ + COPYSEQ_SUCCESS, + COPYSEQ_MISMATCH, + COPYSEQ_INSUFFICIENT_PERM, + COPYSEQ_SKIPPED +} CopySeqResult; + +static List *seqinfos = NIL; + +/* + * Apply worker determines if sequence synchronization is needed. + * + * Start a sequencesync worker if one is not already running. The active + * sequencesync worker will handle all pending sequence synchronization. If any + * sequences remain unsynchronized after it exits, a new worker can be started + * in the next iteration. + */ +void +ProcessSequencesForSync(void) +{ + LogicalRepWorker *sequencesync_worker; + int nsyncworkers; + bool has_pending_sequences; + bool started_tx; + + FetchRelationStates(NULL, &has_pending_sequences, &started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + if (!has_pending_sequences) + return; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Check if there is a sequencesync worker already running? */ + sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC, + MyLogicalRepWorker->subid, + InvalidOid, true); + if (sequencesync_worker) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * Count running sync workers for this subscription, while we have the + * lock. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + LWLockRelease(LogicalRepWorkerLock); + + /* + * It is okay to read/update last_seqsync_start_time here in apply worker + * as we have already ensured that sync worker doesn't exist. + */ + launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid, + &MyLogicalRepWorker->last_seqsync_start_time); +} + +/* + * get_sequences_string + * + * Build a comma-separated string of schema-qualified sequence names + * for the given list of sequence indexes. + */ +static void +get_sequences_string(List *seqindexes, StringInfo buf) +{ + resetStringInfo(buf); + foreach_int(seqidx, seqindexes) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx); + + if (buf->len > 0) + appendStringInfoString(buf, ", "); + + appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname); + } +} + +/* + * report_sequence_errors + * + * Report discrepancies found during sequence synchronization between + * the publisher and subscriber. Emits warnings for: + * a) insufficient privileges + * b) mismatched definitions or concurrent rename + * c) missing sequences on the subscriber + * Then raises an ERROR to indicate synchronization failure. + */ +static void +report_sequence_errors(List *insuffperm_seqs_idx, List *mismatched_seqs_idx, + List *missing_seqs_idx) +{ + StringInfo seqstr = makeStringInfo(); + + if (insuffperm_seqs_idx) + { + get_sequences_string(insuffperm_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("insufficient privileges on sequence (%s)", + "insufficient privileges on sequences (%s)", + list_length(insuffperm_seqs_idx), + seqstr->data)); + } + + if (mismatched_seqs_idx) + { + get_sequences_string(mismatched_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("mismatched or renamed sequence on subscriber (%s)", + "mismatched or renamed sequences on subscriber (%s)", + list_length(mismatched_seqs_idx), + seqstr->data)); + } + + if (missing_seqs_idx) + { + get_sequences_string(missing_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("missing sequence on publisher (%s)", + "missing sequences on publisher (%s)", + list_length(missing_seqs_idx), + seqstr->data)); + } + + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication sequence synchronization failed for subscription \"%s\"", + MySubscription->name)); +} + +/* + * get_and_validate_seq_info + * + * Extracts remote sequence information from the tuple slot received from the + * publisher, and validates it against the corresponding local sequence + * definition. + */ +static CopySeqResult +get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, + LogicalRepSequenceInfo **seqinfo, int *seqidx) +{ + bool isnull; + int col = 0; + Oid remote_typid; + int64 remote_start; + int64 remote_increment; + int64 remote_min; + int64 remote_max; + bool remote_cycle; + CopySeqResult result = COPYSEQ_SUCCESS; + HeapTuple tup; + Form_pg_sequence local_seq; + LogicalRepSequenceInfo *seqinfo_local; + + *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Identify the corresponding local sequence for the given index. */ + *seqinfo = seqinfo_local = + (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx); + + seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Sanity check */ + Assert(col == REMOTE_SEQ_COL_COUNT); + + seqinfo_local->found_on_pub = true; + + *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock); + + /* Sequence was concurrently dropped? */ + if (!*sequence_rel) + return COPYSEQ_SKIPPED; + + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid)); + + /* Sequence was concurrently dropped? */ + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for sequence %u", + seqinfo_local->localrelid); + + local_seq = (Form_pg_sequence) GETSTRUCT(tup); + + /* Sequence parameters for remote/local are the same? */ + if (local_seq->seqtypid != remote_typid || + local_seq->seqstart != remote_start || + local_seq->seqincrement != remote_increment || + local_seq->seqmin != remote_min || + local_seq->seqmax != remote_max || + local_seq->seqcycle != remote_cycle) + result = COPYSEQ_MISMATCH; + + /* Sequence was concurrently renamed? */ + if (strcmp(seqinfo_local->nspname, + get_namespace_name(RelationGetNamespace(*sequence_rel))) || + strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel))) + result = COPYSEQ_MISMATCH; + + ReleaseSysCache(tup); + return result; +} + +/* + * Apply remote sequence state to local sequence and mark it as + * synchronized (READY). + */ +static CopySeqResult +copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner) +{ + UserContext ucxt; + AclResult aclresult; + bool run_as_owner = MySubscription->runasowner; + Oid seqoid = seqinfo->localrelid; + + /* + * If the user did not opt to run as the owner of the subscription + * ('run_as_owner'), then copy the sequence as the owner of the sequence. + */ + if (!run_as_owner) + SwitchToUntrustedUser(seqowner, &ucxt); + + aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE); + + if (aclresult != ACLCHECK_OK) + { + if (!run_as_owner) + RestoreUserContext(&ucxt); + + return COPYSEQ_INSUFFICIENT_PERM; + } + + SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called); + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + /* + * Record the remote sequence's LSN in pg_subscription_rel and mark the + * sequence as READY. + */ + UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, + seqinfo->page_lsn, false); + + return COPYSEQ_SUCCESS; +} + +/* + * Copy existing data of sequences from the publisher. + */ +static void +copy_sequences(WalReceiverConn *conn) +{ + int cur_batch_base_index = 0; + int n_seqinfos = list_length(seqinfos); + List *mismatched_seqs_idx = NIL; + List *missing_seqs_idx = NIL; + List *insuffperm_seqs_idx = NIL; + StringInfo seqstr = makeStringInfo(); + StringInfo cmd = makeStringInfo(); + MemoryContext oldctx; + +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + ereport(LOG, + errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", + MySubscription->name, n_seqinfos)); + + while (cur_batch_base_index < n_seqinfos) + { + Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, + BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; + int batch_size = 0; + int batch_succeeded_count = 0; + int batch_mismatched_count = 0; + int batch_skipped_count = 0; + int batch_insuffperm_count = 0; + int batch_missing_count; + Relation sequence_rel; + + WalRcvExecResult *res; + TupleTableSlot *slot; + + StartTransactionCommand(); + + for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); + + if (seqstr->len > 0) + appendStringInfoString(seqstr, ", "); + + appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)", + seqinfo->nspname, seqinfo->seqname, idx); + + if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH) + break; + } + + /* + * We deliberately avoid acquiring a local lock on the sequence before + * querying the publisher to prevent potential distributed deadlocks + * in bi-directional replication setups. + * + * Example scenario: + * + * - On each node, a background worker acquires a lock on a sequence + * as part of a sync operation. + * + * - Concurrently, a user transaction attempts to alter the same + * sequence, waiting on the background worker's lock. + * + * - Meanwhile, a query from the other node tries to access metadata + * that depends on the completion of the alter operation. + * + * - This creates a circular wait across nodes: + * + * Node-1: Query -> waits on Alter -> waits on Sync Worker + * + * Node-2: Query -> waits on Alter -> waits on Sync Worker + * + * Since each node only sees part of the wait graph, the deadlock may + * go undetected, leading to indefinite blocking. + * + * Note: Each entry in VALUES includes an index 'seqidx' that + * represents the sequence's position in the local 'seqinfos' list. + * This index is propagated to the query results and later used to + * directly map the fetched publisher sequence rows back to their + * corresponding local entries without relying on result order or name + * matching. + */ + appendStringInfo(cmd, + "SELECT s.seqidx, ps.*, seq.seqtypid,\n" + " seq.seqstart, seq.seqincrement, seq.seqmin,\n" + " seq.seqmax, seq.seqcycle\n" + "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n" + "JOIN pg_namespace n ON n.nspname = s.schname\n" + "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" + "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" + "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n", + seqstr->data); + + res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch sequence information from the publisher: %s", + res->err)); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + CopySeqResult sync_status; + LogicalRepSequenceInfo *seqinfo; + int seqidx; + + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + sync_status = get_and_validate_seq_info(slot, &sequence_rel, + &seqinfo, &seqidx); + if (sync_status == COPYSEQ_SUCCESS) + sync_status = copy_sequence(seqinfo, + sequence_rel->rd_rel->relowner); + + switch (sync_status) + { + case COPYSEQ_MISMATCH: + + /* + * Remember mismatched sequences in a long-lived memory + * context, since these will be used after the transaction + * commits. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + mismatched_seqs_idx = lappend_int(mismatched_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_mismatched_count++; + break; + case COPYSEQ_INSUFFICIENT_PERM: + + /* + * Remember the sequences with insufficient privileges in + * a long-lived memory context, since these will be used + * after the transaction commits. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_insuffperm_count++; + break; + case COPYSEQ_SKIPPED: + ereport(LOG, + errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently", + seqinfo->nspname, + seqinfo->seqname)); + batch_skipped_count++; + break; + case COPYSEQ_SUCCESS: + elog(DEBUG1, + "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", + MySubscription->name, seqinfo->nspname, + seqinfo->seqname); + batch_succeeded_count++; + break; + } + + if (sequence_rel) + table_close(sequence_rel, NoLock); + } + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + resetStringInfo(seqstr); + resetStringInfo(cmd); + + batch_missing_count = batch_size - (batch_succeeded_count + + batch_mismatched_count + + batch_insuffperm_count + + batch_skipped_count); + + elog(DEBUG1, + "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d skipped, %d mismatched, %d insufficient permission, %d missing from publisher", + MySubscription->name, + (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, + batch_size, batch_succeeded_count, batch_skipped_count, + batch_mismatched_count, batch_insuffperm_count, + batch_missing_count); + + /* Commit this batch, and prepare for next batch */ + CommitTransactionCommand(); + + if (batch_missing_count) + { + for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); + + /* If the sequence was not found on publisher, record it */ + if (!seqinfo->found_on_pub) + missing_seqs_idx = lappend_int(missing_seqs_idx, idx); + } + } + + /* + * cur_batch_base_index is not incremented sequentially because some + * sequences may be missing, and the number of fetched rows may not + * match the batch size. + */ + cur_batch_base_index += batch_size; + } + + /* Report permission issues, mismatches, or missing sequences */ + if (insuffperm_seqs_idx || mismatched_seqs_idx || missing_seqs_idx) + report_sequence_errors(insuffperm_seqs_idx, mismatched_seqs_idx, + missing_seqs_idx); +} + +/* + * Identifies sequences that require synchronization and initiates the + * synchronization process. + */ +static void +LogicalRepSyncSequences(void) +{ + char *err; + bool must_use_password; + Relation rel; + HeapTuple tup; + ScanKeyData skey[2]; + SysScanDesc scan; + Oid subid = MyLogicalRepWorker->subid; + StringInfoData app_name; + + StartTransactionCommand(); + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[0], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + ScanKeyInit(&skey[1], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(SUBREL_STATE_INIT)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 2, skey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + LogicalRepSequenceInfo *seq; + Relation sequence_rel; + MemoryContext oldctx; + + CHECK_FOR_INTERRUPTS(); + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock); + + /* Skip if sequence was dropped concurrently */ + if (!sequence_rel) + continue; + + /* Skip if the relation is not a sequence */ + if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE) + continue; + + /* + * Worker needs to process sequences across transaction boundary, so + * allocate them under long-lived context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + + seq = palloc0_object(LogicalRepSequenceInfo); + seq->localrelid = subrel->srrelid; + seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); + seq->seqname = pstrdup(RelationGetRelationName(sequence_rel)); + seqinfos = lappend(seqinfos, seq); + + MemoryContextSwitchTo(oldctx); + + table_close(sequence_rel, NoLock); + } + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + CommitTransactionCommand(); + + /* + * Exit early if no catalog entries found, likely due to concurrent drops. + */ + if (!seqinfos) + return; + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + initStringInfo(&app_name); + appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT, + MySubscription->oid, GetSystemIdentifier()); + + /* + * Establish the connection to the publisher for sequence synchronization. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + app_name.data, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s", + MySubscription->name, err)); + + pfree(app_name.data); + + copy_sequences(LogRepWorkerWalRcvConn); +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if required. + * + * Note that we don't handle FATAL errors which are probably because of system + * resource error and are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication sequencesync worker entry point */ +void +SequenceSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + FinishSyncWorker(); +} diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index ae8c93859168..19f989410532 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -16,6 +16,7 @@ #include "catalog/pg_subscription_rel.h" #include "pgstat.h" +#include "replication/logicallauncher.h" #include "replication/worker_internal.h" #include "storage/ipc.h" #include "utils/lsyscache.h" @@ -48,6 +49,8 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE pg_noreturn void FinishSyncWorker(void) { + Assert(am_sequencesync_worker() || am_tablesync_worker()); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -61,16 +64,32 @@ FinishSyncWorker(void) /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); + if (am_sequencesync_worker()) + { + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); + + /* + * Find the leader apply worker and reset last_seqsync_start_time. + * This ensures that the apply worker can restart the sequence sync + * worker promptly whenever required. + */ + logicalrep_reset_seqsync_start_time(); + } + else + { + StartTransactionCommand(); + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + CommitTransactionCommand(); - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, - InvalidOid); + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, + InvalidOid); + } /* Stop gracefully */ proc_exit(0); @@ -86,7 +105,52 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) } /* - * Process possible state change(s) of relations that are being synchronized. + * Attempt to launch a sync worker for one or more sequences or a table, if + * a worker slot is available and the retry interval has elapsed. + * + * wtype: sync worker type. + * nsyncworkers: Number of currently running sync workers for the subscription. + * relid: InvalidOid for sequencesync worker, actual relid for tablesync + * worker. + * last_start_time: Pointer to the last start time of the worker. + */ +void +launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, + TimestampTz *last_start_time) +{ + TimestampTz now; + + Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) || + (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid))); + + /* If there is a free sync worker slot, start a new sync worker */ + if (nsyncworkers >= max_sync_workers_per_subscription) + return; + + now = GetCurrentTimestamp(); + + if (!(*last_start_time) || + TimestampDifferenceExceeds(*last_start_time, now, + wal_retrieve_retry_interval)) + { + /* + * Set the last_start_time even if we fail to start the worker, so + * that we won't retry until wal_retrieve_retry_interval has elapsed. + */ + *last_start_time = now; + (void) logicalrep_worker_launch(wtype, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + relid, DSM_HANDLE_INVALID, false); + } +} + +/* + * Process possible state change(s) of relations that are being synchronized + * and start new tablesync workers for the newly added tables. Also, start a + * new sequencesync worker for the newly added sequences. */ void ProcessSyncingRelations(XLogRecPtr current_lsn) @@ -108,6 +172,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) case WORKERTYPE_APPLY: ProcessSyncingTablesForApply(current_lsn); + ProcessSequencesForSync(); + break; + + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "sequence synchronization worker is not expected to process relations"); break; case WORKERTYPE_UNKNOWN: @@ -117,17 +187,29 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) } /* - * Common code to fetch the up-to-date sync state info into the static lists. + * Common code to fetch the up-to-date sync state info for tables and sequences. * - * Returns true if subscription has 1 or more tables, else false. + * The pg_subscription_rel catalog is shared by tables and sequences. Changes + * to either sequences or tables can affect the validity of relation states, so + * we identify non-READY tables and non-READY sequences together to ensure + * consistency. * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. + * has_pending_subtables: true if the subscription has one or more tables that + * are not in READY state, otherwise false. + * has_pending_subsequences: true if the subscription has one or more sequences + * that are not in READY state, otherwise false. */ -bool -FetchRelationStates(bool *started_tx) +void +FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_subsequences, + bool *started_tx) { + /* + * has_subtables and has_subsequences_non_ready are declared as static, + * since the same value can be used until the system table is invalidated. + */ static bool has_subtables = false; + static bool has_subsequences_non_ready = false; *started_tx = false; @@ -135,10 +217,10 @@ FetchRelationStates(bool *started_tx) { MemoryContext oldctx; List *rstates; - ListCell *lc; SubscriptionRelState *rstate; relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; + has_subsequences_non_ready = false; /* Clean the old lists. */ list_free_deep(table_states_not_ready); @@ -150,17 +232,23 @@ FetchRelationStates(bool *started_tx) *started_tx = true; } - /* Fetch tables that are in non-ready state. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true, false, + /* Fetch tables and sequences that are in non-READY state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true, true, true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) + foreach_ptr(SubscriptionRelState, subrel, rstates) { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); + if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE) + has_subsequences_non_ready = true; + else + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, subrel, sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, + rstate); + } } MemoryContextSwitchTo(oldctx); @@ -185,5 +273,9 @@ FetchRelationStates(bool *started_tx) relation_states_validity = SYNC_RELATIONS_STATE_VALID; } - return has_subtables; + if (has_pending_subtables) + *has_pending_subtables = has_subtables; + + if (has_pending_subsequences) + *has_pending_subsequences = has_subsequences_non_ready; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 58c98488d7b7..e5a2856fd174 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -374,14 +374,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) }; static HTAB *last_start_times = NULL; ListCell *lc; - bool started_tx = false; + bool started_tx; bool should_exit = false; Relation rel = NULL; Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchRelationStates(&started_tx); + FetchRelationStates(NULL, NULL, &started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -415,6 +415,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE); + if (rstate->state == SUBREL_STATE_SYNCDONE) { /* @@ -428,11 +436,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } /* * Remove the tablesync origin tracking if exists. @@ -552,43 +555,19 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) */ int nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + struct tablesync_start_time_mapping *hentry; + bool found; /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); - /* - * If there are free sync worker slot(s), start a new sync - * worker for the table. - */ - if (nsyncworkers < max_sync_workers_per_subscription) - { - TimestampTz now = GetCurrentTimestamp(); - struct tablesync_start_time_mapping *hentry; - bool found; + hentry = hash_search(last_start_times, &rstate->relid, + HASH_ENTER, &found); + if (!found) + hentry->last_start_time = 0; - hentry = hash_search(last_start_times, &rstate->relid, - HASH_ENTER, &found); - - if (!found || - TimestampDifferenceExceeds(hentry->last_start_time, now, - wal_retrieve_retry_interval)) - { - /* - * Set the last_start_time even if we fail to start - * the worker, so that we won't retry until - * wal_retrieve_retry_interval has elapsed. - */ - hentry->last_start_time = now; - (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC, - MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid, - DSM_HANDLE_INVALID, - false); - } - } + launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers, + rstate->relid, &hentry->last_start_time); } } } @@ -1432,8 +1411,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) } /* - * Make sure that the copy command runs as the table owner, unless the - * user has opted out of that behaviour. + * If the user did not opt to run as the owner of the subscription + * ('run_as_owner'), then copy the table as the owner of the table. */ run_as_owner = MySubscription->runasowner; if (!run_as_owner) @@ -1596,7 +1575,7 @@ run_tablesync_worker() /* Logical Replication Tablesync worker entry point */ void -TablesyncWorkerMain(Datum main_arg) +TableSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); @@ -1618,11 +1597,11 @@ TablesyncWorkerMain(Datum main_arg) bool AllTablesyncsReady(void) { - bool started_tx = false; - bool has_subrels = false; + bool started_tx; + bool has_tables; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchRelationStates(&started_tx); + FetchRelationStates(&has_tables, NULL, &started_tx); if (started_tx) { @@ -1634,7 +1613,7 @@ AllTablesyncsReady(void) * Return false when there are no tables in subscription or not all tables * are in ready state; true otherwise. */ - return has_subrels && (table_states_not_ready == NIL); + return has_tables && (table_states_not_ready == NIL); } /* @@ -1649,10 +1628,10 @@ bool HasSubscriptionTablesCached(void) { bool started_tx; - bool has_subrels; + bool has_tables; /* We need up-to-date subscription tables info here */ - has_subrels = FetchRelationStates(&started_tx); + FetchRelationStates(&has_tables, NULL, &started_tx); if (started_tx) { @@ -1660,7 +1639,7 @@ HasSubscriptionTablesCached(void) pgstat_report_stat(true); } - return has_subrels; + return has_tables; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7edd1c9cf060..cbf311efd57e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "sequence synchronization worker is not expected to apply changes"); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -1243,7 +1248,10 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1365,7 +1373,10 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -1421,7 +1432,10 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1487,7 +1501,10 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1622,7 +1639,10 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -2465,7 +2485,10 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -4137,7 +4160,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) AcceptInvalidationMessages(); maybe_reread_subscription(); - /* Process any table synchronization changes. */ + /* + * Process any relations that are being synchronized in parallel + * and any newly added tables or sequences. + */ ProcessSyncingRelations(last_received); } @@ -5700,8 +5726,8 @@ run_apply_worker() } /* - * Common initialization for leader apply worker, parallel apply worker and - * tablesync worker. + * Common initialization for leader apply worker, parallel apply worker, + * tablesync worker and sequencesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. @@ -5812,6 +5838,10 @@ InitializeLogRepWorker(void) (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else if (am_sequencesync_worker()) + ereport(LOG, + (errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", @@ -5831,14 +5861,16 @@ replorigin_reset(int code, Datum arg) replorigin_session_origin_timestamp = 0; } -/* Common function to setup the leader apply or tablesync worker. */ +/* + * Common function to setup the leader apply, tablesync and sequencesync worker. + */ void SetupApplyOrSyncWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -5921,9 +5953,15 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); + if (am_leader_apply_worker() || am_tablesync_worker()) + { + /* + * Report the worker failed during either table synchronization or + * apply. + */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + !am_tablesync_worker()); + } /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 25da769eb359..1128167c0251 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2058,7 +2058,7 @@ }, { name => 'max_sync_workers_per_subscription', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_SUBSCRIBERS', - short_desc => 'Maximum number of table synchronization workers per subscription.', + short_desc => 'Maximum number of workers per subscription for synchronizing tables and sequences.', variable => 'max_sync_workers_per_subscription', boot_val => '2', min => '0', diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9121a382f76b..34b7fddb0e7a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -3433,7 +3433,7 @@ proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u', prorettype => 'int8', proargtypes => 'regclass', prosrc => 'pg_sequence_last_value' }, -{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump', +{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump and sequence synchronization', proname => 'pg_get_sequence_data', provolatile => 'v', proparallel => 'u', prorettype => 'record', proargtypes => 'regclass', proallargtypes => '{regclass,int8,bool,pg_lsn}', proargmodes => '{i,o,o,o}', diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 9f88498ecd3f..9508e553b52a 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -82,6 +82,29 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; +/* + * Holds local sequence identity and corresponding publisher values used during + * sequence synchronization. + */ +typedef struct LogicalRepSequenceInfo +{ + /* Sequence information retrieved from the local node */ + char *seqname; + char *nspname; + Oid localrelid; + + /* Sequence information retrieved from the publisher node */ + XLogRecPtr page_lsn; + int64 last_value; + bool is_called; + + /* + * True if the sequence identified by (nspname, seqname) exists on the + * publisher. + */ + bool found_on_pub; +} LogicalRepSequenceInfo; + extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 9ac0b67683d3..46b4d89dd6ea 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -60,6 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid relid, int64 next, bool is_called); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 88912606e4d5..56fa79b648e7 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); -extern void TablesyncWorkerMain(Datum main_arg); +extern void TableSyncWorkerMain(Datum main_arg); +extern void SequenceSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index e23fa9a4514e..f081619f1513 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -106,6 +107,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz last_seqsync_start_time; } LogicalRepWorker; /* @@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); +extern void logicalrep_reset_seqsync_start_time(void); extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, @@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); +extern void ProcessSequencesForSync(void); pg_noreturn extern void FinishSyncWorker(void); extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue); +extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, + Oid relid, TimestampTz *last_start_time); extern void ProcessSyncingRelations(XLogRecPtr current_lsn); -extern bool FetchRelationStates(bool *started_tx); +extern void FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_sequences, bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); @@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl index 557fc91c017b..ef40ca979b1e 100644 --- a/src/test/subscription/t/036_sequences.pl +++ b/src/test/subscription/t/036_sequences.pl @@ -1,7 +1,7 @@ # Copyright (c) 2025, PostgreSQL Global Development Group -# This tests that sequences are registered to be synced to the subscriber +# This tests that sequences are synced correctly to the subscriber use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -14,6 +14,7 @@ # Avoid checkpoint during the test, otherwise, extra values will be fetched for # the sequences which will cause the test to fail randomly. $node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'checkpoint_timeout = 1h'); $node_publisher->start; # Initialize subscriber node @@ -28,7 +29,14 @@ ); $node_publisher->safe_psql('postgres', $ddl); -# Setup the same structure on the subscriber +# Setup the same structure on the subscriber, plus some extra sequences that +# we'll create on the publisher later +$ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; + CREATE SEQUENCE regress_s2; + CREATE SEQUENCE regress_s3; +); $node_subscriber->safe_psql('postgres', $ddl); # Insert initial test data @@ -46,10 +54,165 @@ "CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub" ); -# Confirm sequences can be listed in pg_subscription_rel -my $result = $node_subscriber->safe_psql('postgres', - "SELECT relname, srsubstate FROM pg_class, pg_subscription_rel WHERE oid = srrelid" +# Wait for initial sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|0|t', 'initial test data replicated'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new +# sequences of the publisher, but changes to existing sequences should +# not be synced. +########## + +# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s2; + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION; +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|31|t', 'Check sequence value in the publisher'); + +# Check - existing sequence ('regress_s1') is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|0|t', 'REFRESH PUBLICATION will not sync existing sequence'); + +# Check - newly published sequence ('regress_s2') is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '100|0|t', + 'REFRESH PUBLICATION will sync newly published sequence'); + +########## +# Test: REFRESH SEQUENCES and REFRESH PUBLICATION (copy_data = off) +# +# 1. ALTER SUBSCRIPTION ... REFRESH SEQUENCES should re-synchronize all +# existing sequences, but not synchronize newly added ones. +# 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = off) should +# also not update sequence values for newly added sequences. +########## + +# Create a new sequence 'regress_s3', and update the existing sequence +# 'regress_s2'. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s3; + INSERT INTO regress_seq_test SELECT nextval('regress_s3') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); +)); + +# 1. Do ALTER SUBSCRIPTION ... REFRESH SEQUENCES +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES; +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - existing sequences ('regress_s1' and 'regress_s2') are synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|0|t', 'REFRESH SEQUENCES will sync existing sequences'); +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '200|0|t', 'REFRESH SEQUENCES will sync existing sequences'); + +# Check - newly published sequence ('regress_s3') is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s3; +)); +is($result, '1|0|f', + 'REFRESH SEQUENCES will not sync newly published sequence'); + +# 2. Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data as false +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION WITH (copy_data = false); +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - newly published sequence ('regress_s3') is not synced when +# (copy_data = off). +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s3; +)); +is($result, '1|0|f', + 'REFRESH PUBLICATION will not sync newly published sequence with copy_data as off' ); -is($result, 'regress_s1|i', "Sequence can be in pg_subscription_rel catalog"); + +########## +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should report an error when: +# a) sequence definitions differ between the publisher and subscriber, or +# b) a sequence is missing on the publisher. +########## + +# Create a new sequence 'regress_s4' whose START value is not the same in the +# publisher and subscriber. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s4 START 1 INCREMENT 2; +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s4 START 10 INCREMENT 2; +)); + +my $log_offset = -s $node_subscriber->logfile; + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION"); + +# Verify that an error is logged for parameter differences on sequence +# ('regress_s4'). +$node_subscriber->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? mismatched or renamed sequence on subscriber \("public.regress_s4"\)\n.*ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"/, + $log_offset); + +# Verify that an error is logged for the missing sequence ('regress_s4'). +$node_publisher->safe_psql('postgres', qq(DROP SEQUENCE regress_s4;)); + +$node_subscriber->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)\n.*ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"/, + $log_offset); done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 018b5919cf66..2ca7b75af579 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -526,6 +526,7 @@ CopyMethod CopyMultiInsertBuffer CopyMultiInsertInfo CopyOnErrorChoice +CopySeqResult CopySource CopyStmt CopyToRoutine @@ -1629,6 +1630,7 @@ LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation LogicalRepRollbackPreparedTxnData +LogicalRepSequenceInfo LogicalRepStreamAbortData LogicalRepTupleData LogicalRepTyp From 45075f400a825bfefa34c9cbc10201e1cfbfaab5 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 27 Oct 2025 09:18:07 +0530 Subject: [PATCH 2/3] Documentation for sequence synchronization feature. Documentation for sequence synchronization feature. Author: Vignesh C Reviewed-by: Amit Kapila Reviewed-by: shveta malik Reviewed-by: Hou Zhijie Reviewed-by: Masahiko Sawada Reviewed-by: Hayato Kuroda Reviewed-by: Dilip Kumar Reviewed-by: Peter Smith Reviewed-by: Nisha Moond Reviewed-by: Shlok Kyal Discussion: https://www.postgresql.org/message-id/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.co --- doc/src/sgml/catalogs.sgml | 2 +- doc/src/sgml/config.sgml | 14 +- doc/src/sgml/func/func-sequence.sgml | 24 +++ doc/src/sgml/logical-replication.sgml | 224 ++++++++++++++++++++-- doc/src/sgml/monitoring.sgml | 5 +- doc/src/sgml/ref/alter_subscription.sgml | 15 ++ doc/src/sgml/ref/create_subscription.sgml | 19 +- 7 files changed, 275 insertions(+), 28 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 6c8a0f173c97..2fc634429802 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6568,7 +6568,7 @@ SCRAM-SHA-256$<iteration count>:&l (references pg_class.oid) - Reference to relation + Reference to table or sequence diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 06d1e4403b55..f5cbc68b938e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5191,9 +5191,9 @@ ANY num_sync ( num_sync ( num_sync ( + + pg_get_sequence_data + + pg_get_sequence_data ( regclass ) + record + ( last_value bigint, + is_called bool, + page_lsn pg_lsn ) + + + Returns information about the sequence. last_value + indicates last sequence value set in sequence by nextval or setval, + is_called indicates whether the sequence has been + used, and page_lsn is the LSN corresponding to the + most recent WAL record that modified this sequence relation. + + + This function requires USAGE + or SELECT privilege on the sequence. + + diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index b01f5e998b2c..9e78c2f0465b 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -113,7 +113,9 @@ Publications may currently only contain tables or sequences. Objects must be added explicitly, except when a publication is created using FOR TABLES IN SCHEMA, FOR ALL TABLES, - or FOR ALL SEQUENCES. + or FOR ALL SEQUENCES. Unlike tables, sequences can be + synchronized at any time. For more information, see + . @@ -1745,6 +1747,194 @@ Publications: + + Replicating Sequences + + + To synchronize sequences from a publisher to a subscriber, first publish + them using + CREATE PUBLICATION ... FOR ALL SEQUENCES and then + on the subscriber: + + + + + + + use CREATE SUBSCRIPTION + to initially synchronize the published sequences. + + + + + use + ALTER SUBSCRIPTION ... REFRESH PUBLICATION + to synchronize only newly added sequences. + + + + + use + ALTER SUBSCRIPTION ... REFRESH SEQUENCES + to re-synchronize all sequences currently known to the subscription. + + + + + + + A new sequence synchronization worker will be started + after executing any of the above subscriber commands, and will exit once the + sequences are synchronized. + + + The ability to launch a sequence synchronization worker is limited by the + + max_sync_workers_per_subscription + configuration. + + + + Sequence Definition Mismatches + + The sequence synchronization worker validates that sequence definitions + match between publisher and subscriber. If mismatches exist, the worker + logs an error identifying them and exits. The apply worker continues + respawning the sequence synchronization worker until synchronization + succeeds. See also + wal_retrieve_retry_interval. + + + To resolve this, use + ALTER SEQUENCE + to align the subscriber's sequence parameters with those of the publisher. + + + + + Refreshing Stale Sequences + + Subscriber side sequence values may frequently become out of sync due to + updates on the publisher. + + + Subscriber sequence values drift out of sync as the publisher advances + them. Compare values between publisher and subscriber, then run + + ALTER SUBSCRIPTION ... REFRESH SEQUENCES to + resynchronize if necessary. + + + + + Examples + + + Create some sequences on the publisher. + +/* pub # */ CREATE SEQUENCE s1 START WITH 10 INCREMENT BY 1; +/* pub # */ CREATE SEQUENCE s2 START WITH 100 INCREMENT BY 10; + + + + Create the same sequences on the subscriber. + +/* sub # */ CREATE SEQUENCE s1 START WITH 10 INCREMENT BY 1 +/* sub # */ CREATE SEQUENCE s2 START WITH 100 INCREMENT BY 10; + + + + Update the sequences at the publisher side a few times. + +/* pub # */ SELECT nextval('s1'); + nextval +--------- + 10 +(1 row) +/* pub # */ SELECT nextval('s1'); + nextval +--------- + 11 +(1 row) +/* pub # */ SELECT nextval('s2'); + nextval +--------- + 100 +(1 row) +/* pub # */ SELECT nextval('s2'); + nextval +--------- + 110 +(1 row) + + + + Create a publication for the sequences. + +/* pub # */ CREATE PUBLICATION pub1 FOR ALL SEQUENCES; + + + + Subscribe to the publication. + +/* sub # */ CREATE SUBSCRIPTION sub1 +/* sub - */ CONNECTION 'host=localhost dbname=test_pub application_name=sub1' +/* sub - */ PUBLICATION pub1; + + + + Observe that initial sequence values are synchronized. + +/* sub # */ SELECT * FROM s1; + last_value | log_cnt | is_called +------------+---------+----------- + 11 | 31 | t +(1 row) + +/* sub # */ SELECT * FROM s2; + last_value | log_cnt | is_called +------------+---------+----------- + 110 | 31 | t +(1 row) + + + + Update the sequences at the publisher side. + +/* pub # */ SELECT nextval('s1'); + nextval +--------- + 12 +(1 row) +/* pub # */ SELECT nextval('s2'); + nextval +--------- + 120 +(1 row) + + + + Re-synchronize all sequences known to the subscriber using + + ALTER SUBSCRIPTION ... REFRESH SEQUENCES. + +/* sub # */ ALTER SUBSCRIPTION sub1 REFRESH SEQUENCES; + +/* sub # */ SELECT * FROM s1; + last_value | log_cnt | is_called +------------+---------+----------- + 12 | 30 | t +(1 row) + +/* sub # */ SELECT * FROM s2 + last_value | log_cnt | is_called +------------+---------+----------- + 120 | 30 | t +(1 row) + + + + Conflicts @@ -2090,16 +2280,19 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER - Sequence data is not replicated. The data in serial or identity columns - backed by sequences will of course be replicated as part of the table, - but the sequence itself would still show the start value on the - subscriber. If the subscriber is used as a read-only database, then this - should typically not be a problem. If, however, some kind of switchover - or failover to the subscriber database is intended, then the sequences - would need to be updated to the latest values, either by copying the - current data from the publisher (perhaps - using pg_dump) or by determining a sufficiently high - value from the tables themselves. + Incremental sequence changes are not replicated. Although the data in + serial or identity columns backed by sequences will be replicated as part + of the table, the sequences themselves do not replicate ongoing changes. + On the subscriber, a sequence will retain the last value it synchronized + from the publisher. If the subscriber is used as a read-only database, + then this should typically not be a problem. If, however, some kind of + switchover or failover to the subscriber database is intended, then the + sequences would need to be updated to the latest values, either by + executing + ALTER SUBSCRIPTION ... REFRESH SEQUENCES + or by copying the current data from the publisher (perhaps using + pg_dump) or by determining a sufficiently high value + from the tables themselves. @@ -2423,8 +2616,8 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER max_logical_replication_workers must be set to at least the number of subscriptions (for leader apply - workers), plus some reserve for the table synchronization workers and - parallel apply workers. + workers), plus some reserve for the parallel apply workers, and + table/sequence synchronization workers. @@ -2437,8 +2630,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER max_sync_workers_per_subscription - controls the amount of parallelism of the initial data copy during the - subscription initialization or when new tables are added. + controls how many tables can be synchronized in parallel during + subscription initialization or when new tables are added. One additional + worker is also needed for sequence synchronization. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index f3bf527d5b4b..e3523ac882d9 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2045,8 +2045,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage Type of the subscription worker process. Possible types are - apply, parallel apply, and - table synchronization. + apply, parallel apply, + table synchronization, and + sequence synchronization. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 8ab3b7fbd377..27c06439f4fd 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -195,6 +195,12 @@ ALTER SUBSCRIPTION name RENAME TO < use ALTER SUBSCRIPTION ... REFRESH SEQUENCES. + + See for recommendations on how + to handle any warnings about sequence definition differences between + the publisher and the subscriber, which might occur when + copy_data = true. + See for details of how copy_data = true can interact with the @@ -225,6 +231,15 @@ ALTER SUBSCRIPTION name RENAME TO < data for all currently subscribed sequences. It does not add or remove sequences from the subscription to match the publication. + + See for + recommendations on how to handle any warnings about sequence definition + differences between the publisher and the subscriber. + + + See for recommendations on how to + identify and handle out-of-sync sequences. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index ed82cf1809e5..d2ca3165f8a1 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -228,7 +228,7 @@ CREATE SUBSCRIPTION subscription_name for more about send/receive - functions). + functions). This parameter has no effect for sequences. @@ -265,6 +265,12 @@ CREATE SUBSCRIPTION subscription_namecopy_data = true can interact with the origin parameter. + + See + for recommendations on how to handle any warnings about sequence + definition differences between the publisher and the subscriber, + which might occur when copy_data = true. + @@ -280,6 +286,7 @@ CREATE SUBSCRIPTION subscription_name @@ -310,7 +317,8 @@ CREATE SUBSCRIPTION subscription_name setting within this subscription's apply worker processes. The default value - is off. + is off. This parameter has no effect for + sequences. @@ -340,7 +348,8 @@ CREATE SUBSCRIPTION subscription_name Specifies whether two-phase commit is enabled for this subscription. - The default is false. + The default is false. This parameter has no effect + for sequences. @@ -417,6 +426,7 @@ CREATE SUBSCRIPTION subscription_nameorigin to any means that the publisher sends changes regardless of their origin. The default is any. + This parameter has no effect for sequences. See for details of how @@ -449,7 +459,8 @@ CREATE SUBSCRIPTION subscription_name is enabled, and a physical replication slot named pg_conflict_detection is created on the subscriber to prevent the information for detecting - conflicts from being removed. + conflicts from being removed. This parameter has no effect for + sequences. From ce57eaef3572eda5ea6bf8eaf8463b4afeebf060 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 3 Nov 2025 19:50:16 +0530 Subject: [PATCH 3/3] Add seq_sync_error_count to subscription statistics. This commit introduces a new column seq_sync_error_count to subscription statistics. The new field tracks the number of errors encountered during sequence synchronization for each subscription. --- doc/src/sgml/monitoring.sgml | 9 +++ src/backend/catalog/system_views.sql | 1 + .../replication/logical/sequencesync.c | 3 + src/backend/replication/logical/tablesync.c | 3 +- src/backend/replication/logical/worker.c | 18 ++--- .../utils/activity/pgstat_subscription.c | 27 +++++-- src/backend/utils/adt/pgstatfuncs.c | 27 ++++--- src/include/catalog/pg_proc.dat | 6 +- src/include/pgstat.h | 6 +- src/test/regress/expected/rules.out | 3 +- src/test/subscription/t/026_stats.pl | 80 ++++++++++++------- 11 files changed, 123 insertions(+), 60 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index e3523ac882d9..1dc0024ab921 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2193,6 +2193,15 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + sequence_sync_error_count bigint + + + Number of times an error occurred during the sequence synchronization + + + sync_error_count bigint diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index dec8df4f8ee6..059e8778ca7c 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1415,6 +1415,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.subid, s.subname, ss.apply_error_count, + ss.seq_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 623311b55ab6..22e3018f1eab 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -710,6 +710,9 @@ start_sequence_sync() * idle state. */ AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_SEQUENCESYNC); + PG_RE_THROW(); } } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e5a2856fd174..dcc6124cc730 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1530,7 +1530,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_TABLESYNC); PG_RE_THROW(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cbf311efd57e..8b89eddb0cc7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5606,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + pgstat_report_subscription_error(MySubscription->oid, + MyLogicalRepWorker->type); PG_RE_THROW(); } @@ -5953,15 +5954,12 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - if (am_leader_apply_worker() || am_tablesync_worker()) - { - /* - * Report the worker failed during either table synchronization or - * apply. - */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - } + /* + * Report the worker failed during either sequence synchronization or + * table synchronization or apply. + */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->type); /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index f9a1c831a07e..35916772b9dc 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "replication/worker_internal.h" #include "utils/pgstat_internal.h" @@ -24,7 +25,7 @@ * Report a subscription error. */ void -pgstat_report_subscription_error(Oid subid, bool is_apply_error) +pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) { PgStat_EntryRef *entry_ref; PgStat_BackendSubEntry *pending; @@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error) InvalidOid, subid, NULL); pending = entry_ref->pending; - if (is_apply_error) - pending->apply_error_count++; - else - pending->sync_error_count++; + switch (wtype) + { + case WORKERTYPE_APPLY: + pending->apply_error_count++; + break; + + case WORKERTYPE_SEQUENCESYNC: + pending->seq_sync_error_count++; + break; + + case WORKERTYPE_TABLESYNC: + pending->sync_error_count++; + break; + + default: + /* Should never happen. */ + Assert(0); + break; + } } /* @@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) #define SUB_ACC(fld) shsubent->stats.fld += localent->fld SUB_ACC(apply_error_count); + SUB_ACC(seq_sync_error_count); SUB_ACC(sync_error_count); for (int i = 0; i < CONFLICT_NUM_TYPES; i++) SUB_ACC(conflict_count[i]); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index a710508979e4..1521d6e2ab43 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2203,7 +2203,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2221,25 +2221,27 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "seq_sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2256,6 +2258,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* apply_error_count */ values[i++] = Int64GetDatum(subentry->apply_error_count); + /* seq_sync_error_count */ + values[i++] = Int64GetDatum(subentry->seq_sync_error_count); + /* sync_error_count */ values[i++] = Int64GetDatum(subentry->sync_error_count); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 34b7fddb0e7a..5cf9e12fcb9a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5704,9 +5704,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,seq_sync_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7ae503e71a27..a0610bb3e316 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -16,6 +16,7 @@ #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ #include "replication/conflict.h" +#include "replication/worker_internal.h" #include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/pgstat_kind.h" @@ -108,6 +109,7 @@ typedef struct PgStat_FunctionCallUsage typedef struct PgStat_BackendSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter seq_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; } PgStat_BackendSubEntry; @@ -416,6 +418,7 @@ typedef struct PgStat_SLRUStats typedef struct PgStat_StatSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter seq_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; TimestampTz stat_reset_timestamp; @@ -769,7 +772,8 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void); * Functions in pgstat_subscription.c */ -extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error); +extern void pgstat_report_subscription_error(Oid subid, + LogicalRepWorkerType wtype); extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type); extern void pgstat_create_subscription(Oid subid); extern void pgstat_drop_subscription(Oid subid); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 77e25ca029e5..fe20f613c3af 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2191,6 +2191,7 @@ pg_stat_subscription| SELECT su.oid AS subid, pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, + ss.seq_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, @@ -2202,7 +2203,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_multiple_unique_conflicts, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, seq_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index 00a1c2fcd48c..23f3511f9a4f 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -21,7 +21,8 @@ sub create_sub_pub_w_errors { - my ($node_publisher, $node_subscriber, $db, $table_name) = @_; + my ($node_publisher, $node_subscriber, $db, $table_name, $sequence_name) + = @_; # Initial table setup on both publisher and subscriber. On subscriber we # create the same tables but with primary keys. Also, insert some data that # will conflict with the data replicated from publisher later. @@ -32,6 +33,7 @@ sub create_sub_pub_w_errors CREATE TABLE $table_name(a int); ALTER TABLE $table_name REPLICA IDENTITY FULL; INSERT INTO $table_name VALUES (1); + CREATE SEQUENCE $sequence_name; COMMIT; ]); $node_subscriber->safe_psql( @@ -40,45 +42,56 @@ sub create_sub_pub_w_errors BEGIN; CREATE TABLE $table_name(a int primary key); INSERT INTO $table_name VALUES (1); + CREATE SEQUENCE $sequence_name INCREMENT BY 10; COMMIT; ]); # Set up publication. my $pub_name = $table_name . '_pub'; + my $pub_seq_name = $sequence_name . '_pub'; my $publisher_connstr = $node_publisher->connstr . qq( dbname=$db); - $node_publisher->safe_psql($db, - qq(CREATE PUBLICATION $pub_name FOR TABLE $table_name)); + $node_publisher->safe_psql( + $db, + qq[ + CREATE PUBLICATION $pub_name FOR TABLE $table_name; + CREATE PUBLICATION $pub_seq_name FOR ALL SEQUENCES; + ]); # Create subscription. The tablesync for table on subscription will enter into - # infinite error loop due to violating the unique constraint. + # infinite error loop due to violating the unique constraint. The sequencesync + # will also fail due to different sequence increment values on publisher and + # subscriber. my $sub_name = $table_name . '_sub'; $node_subscriber->safe_psql($db, - qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name) + qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name, $pub_seq_name) ); $node_publisher->wait_for_catchup($sub_name); - # Wait for the tablesync error to be reported. + # Wait for the tablesync and sequencesync error to be reported. $node_subscriber->poll_query_until( $db, qq[ - SELECT sync_error_count > 0 - FROM pg_stat_subscription_stats - WHERE subname = '$sub_name' + SELECT count(1) = 1 FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' and seq_sync_error_count > 0 and sync_error_count > 0 ]) or die qq(Timed out while waiting for tablesync errors for subscription '$sub_name'); + # Change the sequence start value on the subscriber so that it doesn't error out. + $node_subscriber->safe_psql($db, + qq(ALTER SEQUENCE $sequence_name INCREMENT 1)); + # Truncate test_tab1 so that tablesync worker can continue. $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name)); - # Wait for initial tablesync to finish. + # Wait for initial sync to finish. $node_subscriber->poll_query_until( $db, qq[ - SELECT count(1) = 1 FROM pg_subscription_rel - WHERE srrelid = '$table_name'::regclass AND srsubstate in ('r', 's') + SELECT count(1) = 2 FROM pg_subscription_rel + WHERE srrelid IN ('$table_name'::regclass, '$sequence_name'::regclass) AND srsubstate in ('r', 's') ]) or die qq(Timed out while waiting for subscriber to synchronize data for table '$table_name'.); @@ -136,14 +149,17 @@ sub create_sub_pub_w_errors # Create the publication and subscription with sync and apply errors my $table1_name = 'test_tab1'; +my $sequence1_name = 'test_seq1'; my ($pub1_name, $sub1_name) = create_sub_pub_w_errors($node_publisher, $node_subscriber, $db, - $table1_name); + $table1_name, $sequence1_name); -# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset +# timestamp is NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, + seq_sync_error_count > 0, sync_error_count > 0, confl_insert_exists > 0, confl_delete_missing > 0, @@ -151,8 +167,8 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.) + qq(t|t|t|t|t|t), + qq(Check that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.) ); # Reset a single subscription @@ -160,10 +176,12 @@ sub create_sub_pub_w_errors qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name'))) ); -# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and +# stats_reset timestamp is not NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -171,8 +189,8 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) ); # Get reset timestamp @@ -198,14 +216,17 @@ sub create_sub_pub_w_errors # Make second subscription and publication my $table2_name = 'test_tab2'; +my $sequence2_name = 'test_seq2'; my ($pub2_name, $sub2_name) = create_sub_pub_w_errors($node_publisher, $node_subscriber, $db, - $table2_name); + $table2_name, $sequence2_name); -# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 +# and stats_reset timestamp is NULL is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, + seq_sync_error_count > 0, sync_error_count > 0, confl_insert_exists > 0, confl_delete_missing > 0, @@ -213,18 +234,20 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.) ); # Reset all subscriptions $node_subscriber->safe_psql($db, qq(SELECT pg_stat_reset_subscription_stats(NULL))); -# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and +# stats_reset timestamp is not NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -232,13 +255,14 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) ); is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -246,8 +270,8 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) ); $reset_time1 = $node_subscriber->safe_psql($db,