diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 6c8a0f173c97..2fc634429802 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -6568,7 +6568,7 @@ SCRAM-SHA-256$<iteration count>:&l
(references pg_class.oid)
- Reference to relation
+ Reference to table or sequence
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 06d1e4403b55..f5cbc68b938e 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5191,9 +5191,9 @@ ANY num_sync ( num_sync ( num_sync (
+
+ pg_get_sequence_data
+
+ pg_get_sequence_data ( regclass )
+ record
+ ( last_valuebigint,
+ is_calledbool,
+ page_lsnpg_lsn )
+
+
+ Returns information about the sequence. last_value
+ indicates last sequence value set in sequence by nextval or setval,
+ is_called indicates whether the sequence has been
+ used, and page_lsn is the LSN corresponding to the
+ most recent WAL record that modified this sequence relation.
+
+
+ This function requires USAGE
+ or SELECT privilege on the sequence.
+
+
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index b01f5e998b2c..9e78c2f0465b 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -113,7 +113,9 @@
Publications may currently only contain tables or sequences. Objects must be
added explicitly, except when a publication is created using
FOR TABLES IN SCHEMA, FOR ALL TABLES,
- or FOR ALL SEQUENCES.
+ or FOR ALL SEQUENCES. Unlike tables, sequences can be
+ synchronized at any time. For more information, see
+ .
@@ -1745,6 +1747,194 @@ Publications:
+
+ Replicating Sequences
+
+
+ To synchronize sequences from a publisher to a subscriber, first publish
+ them using
+ CREATE PUBLICATION ... FOR ALL SEQUENCES and then
+ on the subscriber:
+
+
+
+
+
+
+ use CREATE SUBSCRIPTION
+ to initially synchronize the published sequences.
+
+
+
+
+ use
+ ALTER SUBSCRIPTION ... REFRESH PUBLICATION
+ to synchronize only newly added sequences.
+
+
+
+
+ use
+ ALTER SUBSCRIPTION ... REFRESH SEQUENCES
+ to re-synchronize all sequences currently known to the subscription.
+
+
+
+
+
+
+ A new sequence synchronization worker will be started
+ after executing any of the above subscriber commands, and will exit once the
+ sequences are synchronized.
+
+
+ The ability to launch a sequence synchronization worker is limited by the
+
+ max_sync_workers_per_subscription
+ configuration.
+
+
+
+ Sequence Definition Mismatches
+
+ The sequence synchronization worker validates that sequence definitions
+ match between publisher and subscriber. If mismatches exist, the worker
+ logs an error identifying them and exits. The apply worker continues
+ respawning the sequence synchronization worker until synchronization
+ succeeds. See also
+ wal_retrieve_retry_interval.
+
+
+ To resolve this, use
+ ALTER SEQUENCE
+ to align the subscriber's sequence parameters with those of the publisher.
+
+
+
+
+ Refreshing Stale Sequences
+
+ Subscriber side sequence values may frequently become out of sync due to
+ updates on the publisher.
+
+
+ Subscriber sequence values drift out of sync as the publisher advances
+ them. Compare values between publisher and subscriber, then run
+
+ ALTER SUBSCRIPTION ... REFRESH SEQUENCES to
+ resynchronize if necessary.
+
+
+
+
+ Examples
+
+
+ Create some sequences on the publisher.
+
+/* pub # */ CREATE SEQUENCE s1 START WITH 10 INCREMENT BY 1;
+/* pub # */ CREATE SEQUENCE s2 START WITH 100 INCREMENT BY 10;
+
+
+
+ Create the same sequences on the subscriber.
+
+/* sub # */ CREATE SEQUENCE s1 START WITH 10 INCREMENT BY 1
+/* sub # */ CREATE SEQUENCE s2 START WITH 100 INCREMENT BY 10;
+
+
+
+ Update the sequences at the publisher side a few times.
+
+/* pub # */ SELECT nextval('s1');
+ nextval
+---------
+ 10
+(1 row)
+/* pub # */ SELECT nextval('s1');
+ nextval
+---------
+ 11
+(1 row)
+/* pub # */ SELECT nextval('s2');
+ nextval
+---------
+ 100
+(1 row)
+/* pub # */ SELECT nextval('s2');
+ nextval
+---------
+ 110
+(1 row)
+
+
+
+ Create a publication for the sequences.
+
+/* pub # */ CREATE PUBLICATION pub1 FOR ALL SEQUENCES;
+
+
+
+ Subscribe to the publication.
+
+/* sub # */ CREATE SUBSCRIPTION sub1
+/* sub - */ CONNECTION 'host=localhost dbname=test_pub application_name=sub1'
+/* sub - */ PUBLICATION pub1;
+
+
+
+ Observe that initial sequence values are synchronized.
+
+/* sub # */ SELECT * FROM s1;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 11 | 31 | t
+(1 row)
+
+/* sub # */ SELECT * FROM s2;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 110 | 31 | t
+(1 row)
+
+
+
+ Update the sequences at the publisher side.
+
+/* pub # */ SELECT nextval('s1');
+ nextval
+---------
+ 12
+(1 row)
+/* pub # */ SELECT nextval('s2');
+ nextval
+---------
+ 120
+(1 row)
+
+
+
+ Re-synchronize all sequences known to the subscriber using
+
+ ALTER SUBSCRIPTION ... REFRESH SEQUENCES.
+
+/* sub # */ ALTER SUBSCRIPTION sub1 REFRESH SEQUENCES;
+
+/* sub # */ SELECT * FROM s1;
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 12 | 30 | t
+(1 row)
+
+/* sub # */ SELECT * FROM s2
+ last_value | log_cnt | is_called
+------------+---------+-----------
+ 120 | 30 | t
+(1 row)
+
+
+
+
Conflicts
@@ -2090,16 +2280,19 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
- Sequence data is not replicated. The data in serial or identity columns
- backed by sequences will of course be replicated as part of the table,
- but the sequence itself would still show the start value on the
- subscriber. If the subscriber is used as a read-only database, then this
- should typically not be a problem. If, however, some kind of switchover
- or failover to the subscriber database is intended, then the sequences
- would need to be updated to the latest values, either by copying the
- current data from the publisher (perhaps
- using pg_dump) or by determining a sufficiently high
- value from the tables themselves.
+ Incremental sequence changes are not replicated. Although the data in
+ serial or identity columns backed by sequences will be replicated as part
+ of the table, the sequences themselves do not replicate ongoing changes.
+ On the subscriber, a sequence will retain the last value it synchronized
+ from the publisher. If the subscriber is used as a read-only database,
+ then this should typically not be a problem. If, however, some kind of
+ switchover or failover to the subscriber database is intended, then the
+ sequences would need to be updated to the latest values, either by
+ executing
+ ALTER SUBSCRIPTION ... REFRESH SEQUENCES
+ or by copying the current data from the publisher (perhaps using
+ pg_dump) or by determining a sufficiently high value
+ from the tables themselves.
@@ -2423,8 +2616,8 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
max_logical_replication_workers
must be set to at least the number of subscriptions (for leader apply
- workers), plus some reserve for the table synchronization workers and
- parallel apply workers.
+ workers), plus some reserve for the parallel apply workers, and
+ table/sequence synchronization workers.
@@ -2437,8 +2630,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
max_sync_workers_per_subscription
- controls the amount of parallelism of the initial data copy during the
- subscription initialization or when new tables are added.
+ controls how many tables can be synchronized in parallel during
+ subscription initialization or when new tables are added. One additional
+ worker is also needed for sequence synchronization.
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index f3bf527d5b4b..1dc0024ab921 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2045,8 +2045,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage
Type of the subscription worker process. Possible types are
- apply, parallel apply, and
- table synchronization.
+ apply, parallel apply,
+ table synchronization, and
+ sequence synchronization.
@@ -2192,6 +2193,15 @@ description | Waiting for a newly initialized WAL file to reach durable storage
+
+
+ sequence_sync_error_countbigint
+
+
+ Number of times an error occurred during the sequence synchronization
+
+
+
sync_error_countbigint
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 8ab3b7fbd377..27c06439f4fd 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -195,6 +195,12 @@ ALTER SUBSCRIPTION name RENAME TO <
use
ALTER SUBSCRIPTION ... REFRESH SEQUENCES.
+
+ See for recommendations on how
+ to handle any warnings about sequence definition differences between
+ the publisher and the subscriber, which might occur when
+ copy_data = true.
+
See for details of
how copy_data = true can interact with the
@@ -225,6 +231,15 @@ ALTER SUBSCRIPTION name RENAME TO <
data for all currently subscribed sequences. It does not add or remove
sequences from the subscription to match the publication.
+
+ See for
+ recommendations on how to handle any warnings about sequence definition
+ differences between the publisher and the subscriber.
+
+
+ See for recommendations on how to
+ identify and handle out-of-sync sequences.
+
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index ed82cf1809e5..d2ca3165f8a1 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -228,7 +228,7 @@ CREATE SUBSCRIPTION subscription_name for more about send/receive
- functions).
+ functions). This parameter has no effect for sequences.
@@ -265,6 +265,12 @@ CREATE SUBSCRIPTION subscription_namecopy_data = true can interact with the
origin parameter.
+
+ See
+ for recommendations on how to handle any warnings about sequence
+ definition differences between the publisher and the subscriber,
+ which might occur when copy_data = true.
+
@@ -280,6 +286,7 @@ CREATE SUBSCRIPTION subscription_name
@@ -310,7 +317,8 @@ CREATE SUBSCRIPTION subscription_name setting within this
subscription's apply worker processes. The default value
- is off.
+ is off. This parameter has no effect for
+ sequences.
@@ -340,7 +348,8 @@ CREATE SUBSCRIPTION subscription_name
Specifies whether two-phase commit is enabled for this subscription.
- The default is false.
+ The default is false. This parameter has no effect
+ for sequences.
@@ -417,6 +426,7 @@ CREATE SUBSCRIPTION subscription_nameorigin
to any means that the publisher sends changes
regardless of their origin. The default is any.
+ This parameter has no effect for sequences.
See for details of how
@@ -449,7 +459,8 @@ CREATE SUBSCRIPTION subscription_name is enabled, and a physical
replication slot named pg_conflict_detection
is created on the subscriber to prevent the information for detecting
- conflicts from being removed.
+ conflicts from being removed. This parameter has no effect for
+ sequences.
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 15b233a37d8d..1945627ed88e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -354,7 +354,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
if (!HeapTupleIsValid(tup))
- elog(ERROR, "subscription table %u in subscription %u does not exist",
+ elog(ERROR, "subscription relation %u in subscription %u does not exist",
relid, subid);
/* Update the tuple. */
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index dec8df4f8ee6..059e8778ca7c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1415,6 +1415,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.subid,
s.subname,
ss.apply_error_count,
+ ss.seq_sync_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index c23dee5231c0..8d671b7a29d6 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -112,7 +112,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity,
bool *is_called,
bool *need_seq_rewrite,
List **owned_by);
-static void do_setval(Oid relid, int64 next, bool iscalled);
static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity);
@@ -954,8 +953,8 @@ lastval(PG_FUNCTION_ARGS)
* it is the only way to clear the is_called flag in an existing
* sequence.
*/
-static void
-do_setval(Oid relid, int64 next, bool iscalled)
+void
+SetSequence(Oid relid, int64 next, bool iscalled)
{
SeqTable elm;
Relation seqrel;
@@ -1056,7 +1055,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
/*
* Implement the 2 arg setval procedure.
- * See do_setval for discussion.
+ * See SetSequence for discussion.
*/
Datum
setval_oid(PG_FUNCTION_ARGS)
@@ -1064,14 +1063,14 @@ setval_oid(PG_FUNCTION_ARGS)
Oid relid = PG_GETARG_OID(0);
int64 next = PG_GETARG_INT64(1);
- do_setval(relid, next, true);
+ SetSequence(relid, next, true);
PG_RETURN_INT64(next);
}
/*
* Implement the 3 arg setval procedure.
- * See do_setval for discussion.
+ * See SetSequence for discussion.
*/
Datum
setval3_oid(PG_FUNCTION_ARGS)
@@ -1080,7 +1079,7 @@ setval3_oid(PG_FUNCTION_ARGS)
int64 next = PG_GETARG_INT64(1);
bool iscalled = PG_GETARG_BOOL(2);
- do_setval(relid, next, iscalled);
+ SetSequence(relid, next, iscalled);
PG_RETURN_INT64(next);
}
@@ -1797,8 +1796,9 @@ pg_sequence_parameters(PG_FUNCTION_ARGS)
/*
* Return the sequence tuple along with its page LSN.
*
- * This is primarily intended for use by pg_dump to gather sequence data
- * without needing to individually query each sequence relation.
+ * This is primarily used by pg_dump to efficiently collect sequence data
+ * without querying each sequence individually, and is also leveraged by
+ * logical replication while synchronizing sequences.
*/
Datum
pg_get_sequence_data(PG_FUNCTION_ARGS)
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 1ad65c237c34..142a02eb5e95 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -131,7 +131,10 @@ static const struct
"ParallelApplyWorkerMain", ParallelApplyWorkerMain
},
{
- "TablesyncWorkerMain", TablesyncWorkerMain
+ "TableSyncWorkerMain", TableSyncWorkerMain
+ },
+ {
+ "SequenceSyncWorkerMain", SequenceSyncWorkerMain
}
};
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c62c8c67521c..c719af1f8a94 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -26,6 +26,7 @@ OBJS = \
proto.o \
relation.o \
reorderbuffer.o \
+ sequencesync.o \
slotsync.o \
snapbuild.o \
syncutils.o \
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 95b5cae9a552..bb49ad17dc7e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -248,9 +248,10 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
* Walks the workers array and searches for one that matches given worker type,
* subscription id, and relation id.
*
- * For apply workers, the relid should be set to InvalidOid, as they manage
- * changes across all tables. For table sync workers, the relid should be set
- * to the OID of the relation being synchronized.
+ * For apply workers and sequencesync workers, the relid should be set to
+ * InvalidOid, as they manage changes across all tables. For tablesync
+ * workers, the relid should be set to the OID of the relation being
+ * synchronized.
*/
LogicalRepWorker *
logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
@@ -334,6 +335,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
int nparallelapplyworkers;
TimestampTz now;
bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+ bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
/*----------
@@ -422,7 +424,8 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
+ if ((is_tablesync_worker || is_sequencesync_worker) &&
+ nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
@@ -478,6 +481,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
TIMESTAMP_NOBEGIN(worker->last_recv_time);
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
+ worker->last_seqsync_start_time = 0;
/* Before releasing lock, remember generation for future identification. */
generation = worker->generation;
@@ -511,8 +515,16 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
break;
+ case WORKERTYPE_SEQUENCESYNC:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication sequencesync worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
+ break;
+
case WORKERTYPE_TABLESYNC:
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication tablesync worker for subscription %u sync %u",
subid,
@@ -848,6 +860,33 @@ logicalrep_launcher_onexit(int code, Datum arg)
LogicalRepCtx->launcher_pid = 0;
}
+/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ *
+ * Note that this value is not stored in the sequencesync worker, because that
+ * has finished already and is about to exit.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+ LogicalRepWorker *worker;
+
+ /*
+ * The apply worker can't access last_seqsync_start_time concurrently, so
+ * it is okay to use SHARED lock here. See ProcessSequencesForSync().
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ worker = logicalrep_worker_find(WORKERTYPE_APPLY,
+ MyLogicalRepWorker->subid, InvalidOid,
+ true);
+ if (worker)
+ worker->last_seqsync_start_time = 0;
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
+
/*
* Cleanup function.
*
@@ -896,7 +935,7 @@ logicalrep_sync_worker_count(Oid subid)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (isTablesyncWorker(w) && w->subid == subid)
+ if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
res++;
}
@@ -1610,7 +1649,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
- if (isTablesyncWorker(&worker))
+ if (isTableSyncWorker(&worker))
values[1] = ObjectIdGetDatum(worker.relid);
else
nulls[1] = true;
@@ -1650,6 +1689,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
case WORKERTYPE_PARALLEL_APPLY:
values[9] = CStringGetTextDatum("parallel apply");
break;
+ case WORKERTYPE_SEQUENCESYNC:
+ values[9] = CStringGetTextDatum("sequence synchronization");
+ break;
case WORKERTYPE_TABLESYNC:
values[9] = CStringGetTextDatum("table synchronization");
break;
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 9283e996ef4a..a2268d8361ee 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -12,6 +12,7 @@ backend_sources += files(
'proto.c',
'relation.c',
'reorderbuffer.c',
+ 'sequencesync.c',
'slotsync.c',
'snapbuild.c',
'syncutils.c',
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
new file mode 100644
index 000000000000..22e3018f1eab
--- /dev/null
+++ b/src/backend/replication/logical/sequencesync.c
@@ -0,0 +1,733 @@
+/*-------------------------------------------------------------------------
+ * sequencesync.c
+ * PostgreSQL logical replication: sequence synchronization
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/sequencesync.c
+ *
+ * NOTES
+ * This file contains code for sequence synchronization for
+ * logical replication.
+ *
+ * Sequences requiring synchronization are tracked in the pg_subscription_rel
+ * catalog.
+ *
+ * Sequences to be synchronized will be added with state INIT when either of
+ * the following commands is executed:
+ * CREATE SUBSCRIPTION
+ * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
+ *
+ * Executing the following command resets all sequences in the subscription to
+ * state INIT, triggering re-synchronization:
+ * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
+ *
+ * The apply worker periodically scans pg_subscription_rel for sequences in
+ * INIT state. When such sequences are found, it spawns a sequencesync worker
+ * to handle synchronization.
+ *
+ * A single sequencesync worker is responsible for synchronizing all sequences.
+ * It begins by retrieving the list of sequences that are flagged for
+ * synchronization, i.e., those in the INIT state. These sequences are then
+ * processed in batches, allowing multiple entries to be synchronized within a
+ * single transaction. The worker fetches the current sequence values and page
+ * LSNs from the remote publisher, updates the corresponding sequences on the
+ * local subscriber, and finally marks each sequence as READY upon successful
+ * synchronization.
+ *
+ * Sequence state transitions follow this pattern:
+ * INIT → READY
+ *
+ * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
+ * sequences are synchronized per transaction. The locks on the sequence
+ * relation will be periodically released at each transaction commit.
+ *
+ * XXX: We didn't choose launcher process to maintain the launch of sequencesync
+ * worker as it didn't have database connection to access the sequences from the
+ * pg_subscription_rel system catalog that need to be synchronized.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "catalog/pg_sequence.h"
+#include "catalog/pg_subscription_rel.h"
+#include "commands/sequence.h"
+#include "pgstat.h"
+#include "postmaster/interrupt.h"
+#include "replication/logicalworker.h"
+#include "replication/worker_internal.h"
+#include "utils/acl.h"
+#include "utils/fmgroids.h"
+#include "utils/guc.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
+#include "utils/syscache.h"
+#include "utils/usercontext.h"
+
+#define REMOTE_SEQ_COL_COUNT 10
+
+typedef enum CopySeqResult
+{
+ COPYSEQ_SUCCESS,
+ COPYSEQ_MISMATCH,
+ COPYSEQ_INSUFFICIENT_PERM,
+ COPYSEQ_SKIPPED
+} CopySeqResult;
+
+static List *seqinfos = NIL;
+
+/*
+ * Apply worker determines if sequence synchronization is needed.
+ *
+ * Start a sequencesync worker if one is not already running. The active
+ * sequencesync worker will handle all pending sequence synchronization. If any
+ * sequences remain unsynchronized after it exits, a new worker can be started
+ * in the next iteration.
+ */
+void
+ProcessSequencesForSync(void)
+{
+ LogicalRepWorker *sequencesync_worker;
+ int nsyncworkers;
+ bool has_pending_sequences;
+ bool started_tx;
+
+ FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
+
+ if (started_tx)
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(true);
+ }
+
+ if (!has_pending_sequences)
+ return;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /* Check if there is a sequencesync worker already running? */
+ sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
+ MyLogicalRepWorker->subid,
+ InvalidOid, true);
+ if (sequencesync_worker)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+ return;
+ }
+
+ /*
+ * Count running sync workers for this subscription, while we have the
+ * lock.
+ */
+ nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ /*
+ * It is okay to read/update last_seqsync_start_time here in apply worker
+ * as we have already ensured that sync worker doesn't exist.
+ */
+ launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
+ &MyLogicalRepWorker->last_seqsync_start_time);
+}
+
+/*
+ * get_sequences_string
+ *
+ * Build a comma-separated string of schema-qualified sequence names
+ * for the given list of sequence indexes.
+ */
+static void
+get_sequences_string(List *seqindexes, StringInfo buf)
+{
+ resetStringInfo(buf);
+ foreach_int(seqidx, seqindexes)
+ {
+ LogicalRepSequenceInfo *seqinfo =
+ (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
+
+ if (buf->len > 0)
+ appendStringInfoString(buf, ", ");
+
+ appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
+ }
+}
+
+/*
+ * report_sequence_errors
+ *
+ * Report discrepancies found during sequence synchronization between
+ * the publisher and subscriber. Emits warnings for:
+ * a) insufficient privileges
+ * b) mismatched definitions or concurrent rename
+ * c) missing sequences on the subscriber
+ * Then raises an ERROR to indicate synchronization failure.
+ */
+static void
+report_sequence_errors(List *insuffperm_seqs_idx, List *mismatched_seqs_idx,
+ List *missing_seqs_idx)
+{
+ StringInfo seqstr = makeStringInfo();
+
+ if (insuffperm_seqs_idx)
+ {
+ get_sequences_string(insuffperm_seqs_idx, seqstr);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("insufficient privileges on sequence (%s)",
+ "insufficient privileges on sequences (%s)",
+ list_length(insuffperm_seqs_idx),
+ seqstr->data));
+ }
+
+ if (mismatched_seqs_idx)
+ {
+ get_sequences_string(mismatched_seqs_idx, seqstr);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
+ "mismatched or renamed sequences on subscriber (%s)",
+ list_length(mismatched_seqs_idx),
+ seqstr->data));
+ }
+
+ if (missing_seqs_idx)
+ {
+ get_sequences_string(missing_seqs_idx, seqstr);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("missing sequence on publisher (%s)",
+ "missing sequences on publisher (%s)",
+ list_length(missing_seqs_idx),
+ seqstr->data));
+ }
+
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
+ MySubscription->name));
+}
+
+/*
+ * get_and_validate_seq_info
+ *
+ * Extracts remote sequence information from the tuple slot received from the
+ * publisher, and validates it against the corresponding local sequence
+ * definition.
+ */
+static CopySeqResult
+get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
+ LogicalRepSequenceInfo **seqinfo, int *seqidx)
+{
+ bool isnull;
+ int col = 0;
+ Oid remote_typid;
+ int64 remote_start;
+ int64 remote_increment;
+ int64 remote_min;
+ int64 remote_max;
+ bool remote_cycle;
+ CopySeqResult result = COPYSEQ_SUCCESS;
+ HeapTuple tup;
+ Form_pg_sequence local_seq;
+ LogicalRepSequenceInfo *seqinfo_local;
+
+ *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ /* Identify the corresponding local sequence for the given index. */
+ *seqinfo = seqinfo_local =
+ (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
+
+ seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ /* Sanity check */
+ Assert(col == REMOTE_SEQ_COL_COUNT);
+
+ seqinfo_local->found_on_pub = true;
+
+ *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
+
+ /* Sequence was concurrently dropped? */
+ if (!*sequence_rel)
+ return COPYSEQ_SKIPPED;
+
+ tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
+
+ /* Sequence was concurrently dropped? */
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for sequence %u",
+ seqinfo_local->localrelid);
+
+ local_seq = (Form_pg_sequence) GETSTRUCT(tup);
+
+ /* Sequence parameters for remote/local are the same? */
+ if (local_seq->seqtypid != remote_typid ||
+ local_seq->seqstart != remote_start ||
+ local_seq->seqincrement != remote_increment ||
+ local_seq->seqmin != remote_min ||
+ local_seq->seqmax != remote_max ||
+ local_seq->seqcycle != remote_cycle)
+ result = COPYSEQ_MISMATCH;
+
+ /* Sequence was concurrently renamed? */
+ if (strcmp(seqinfo_local->nspname,
+ get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
+ strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
+ result = COPYSEQ_MISMATCH;
+
+ ReleaseSysCache(tup);
+ return result;
+}
+
+/*
+ * Apply remote sequence state to local sequence and mark it as
+ * synchronized (READY).
+ */
+static CopySeqResult
+copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
+{
+ UserContext ucxt;
+ AclResult aclresult;
+ bool run_as_owner = MySubscription->runasowner;
+ Oid seqoid = seqinfo->localrelid;
+
+ /*
+ * If the user did not opt to run as the owner of the subscription
+ * ('run_as_owner'), then copy the sequence as the owner of the sequence.
+ */
+ if (!run_as_owner)
+ SwitchToUntrustedUser(seqowner, &ucxt);
+
+ aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
+
+ if (aclresult != ACLCHECK_OK)
+ {
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
+
+ return COPYSEQ_INSUFFICIENT_PERM;
+ }
+
+ SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
+
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
+
+ /*
+ * Record the remote sequence's LSN in pg_subscription_rel and mark the
+ * sequence as READY.
+ */
+ UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
+ seqinfo->page_lsn, false);
+
+ return COPYSEQ_SUCCESS;
+}
+
+/*
+ * Copy existing data of sequences from the publisher.
+ */
+static void
+copy_sequences(WalReceiverConn *conn)
+{
+ int cur_batch_base_index = 0;
+ int n_seqinfos = list_length(seqinfos);
+ List *mismatched_seqs_idx = NIL;
+ List *missing_seqs_idx = NIL;
+ List *insuffperm_seqs_idx = NIL;
+ StringInfo seqstr = makeStringInfo();
+ StringInfo cmd = makeStringInfo();
+ MemoryContext oldctx;
+
+#define MAX_SEQUENCES_SYNC_PER_BATCH 100
+
+ ereport(LOG,
+ errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
+ MySubscription->name, n_seqinfos));
+
+ while (cur_batch_base_index < n_seqinfos)
+ {
+ Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
+ BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
+ int batch_size = 0;
+ int batch_succeeded_count = 0;
+ int batch_mismatched_count = 0;
+ int batch_skipped_count = 0;
+ int batch_insuffperm_count = 0;
+ int batch_missing_count;
+ Relation sequence_rel;
+
+ WalRcvExecResult *res;
+ TupleTableSlot *slot;
+
+ StartTransactionCommand();
+
+ for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
+ {
+ LogicalRepSequenceInfo *seqinfo =
+ (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
+
+ if (seqstr->len > 0)
+ appendStringInfoString(seqstr, ", ");
+
+ appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)",
+ seqinfo->nspname, seqinfo->seqname, idx);
+
+ if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
+ break;
+ }
+
+ /*
+ * We deliberately avoid acquiring a local lock on the sequence before
+ * querying the publisher to prevent potential distributed deadlocks
+ * in bi-directional replication setups.
+ *
+ * Example scenario:
+ *
+ * - On each node, a background worker acquires a lock on a sequence
+ * as part of a sync operation.
+ *
+ * - Concurrently, a user transaction attempts to alter the same
+ * sequence, waiting on the background worker's lock.
+ *
+ * - Meanwhile, a query from the other node tries to access metadata
+ * that depends on the completion of the alter operation.
+ *
+ * - This creates a circular wait across nodes:
+ *
+ * Node-1: Query -> waits on Alter -> waits on Sync Worker
+ *
+ * Node-2: Query -> waits on Alter -> waits on Sync Worker
+ *
+ * Since each node only sees part of the wait graph, the deadlock may
+ * go undetected, leading to indefinite blocking.
+ *
+ * Note: Each entry in VALUES includes an index 'seqidx' that
+ * represents the sequence's position in the local 'seqinfos' list.
+ * This index is propagated to the query results and later used to
+ * directly map the fetched publisher sequence rows back to their
+ * corresponding local entries without relying on result order or name
+ * matching.
+ */
+ appendStringInfo(cmd,
+ "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
+ " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
+ " seq.seqmax, seq.seqcycle\n"
+ "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
+ "JOIN pg_namespace n ON n.nspname = s.schname\n"
+ "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
+ "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
+ "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
+ seqstr->data);
+
+ res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch sequence information from the publisher: %s",
+ res->err));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ CopySeqResult sync_status;
+ LogicalRepSequenceInfo *seqinfo;
+ int seqidx;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ sync_status = get_and_validate_seq_info(slot, &sequence_rel,
+ &seqinfo, &seqidx);
+ if (sync_status == COPYSEQ_SUCCESS)
+ sync_status = copy_sequence(seqinfo,
+ sequence_rel->rd_rel->relowner);
+
+ switch (sync_status)
+ {
+ case COPYSEQ_MISMATCH:
+
+ /*
+ * Remember mismatched sequences in a long-lived memory
+ * context, since these will be used after the transaction
+ * commits.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
+ seqidx);
+ MemoryContextSwitchTo(oldctx);
+ batch_mismatched_count++;
+ break;
+ case COPYSEQ_INSUFFICIENT_PERM:
+
+ /*
+ * Remember the sequences with insufficient privileges in
+ * a long-lived memory context, since these will be used
+ * after the transaction commits.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
+ seqidx);
+ MemoryContextSwitchTo(oldctx);
+ batch_insuffperm_count++;
+ break;
+ case COPYSEQ_SKIPPED:
+ ereport(LOG,
+ errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
+ seqinfo->nspname,
+ seqinfo->seqname));
+ batch_skipped_count++;
+ break;
+ case COPYSEQ_SUCCESS:
+ elog(DEBUG1,
+ "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
+ MySubscription->name, seqinfo->nspname,
+ seqinfo->seqname);
+ batch_succeeded_count++;
+ break;
+ }
+
+ if (sequence_rel)
+ table_close(sequence_rel, NoLock);
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+ walrcv_clear_result(res);
+ resetStringInfo(seqstr);
+ resetStringInfo(cmd);
+
+ batch_missing_count = batch_size - (batch_succeeded_count +
+ batch_mismatched_count +
+ batch_insuffperm_count +
+ batch_skipped_count);
+
+ elog(DEBUG1,
+ "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d skipped, %d mismatched, %d insufficient permission, %d missing from publisher",
+ MySubscription->name,
+ (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
+ batch_size, batch_succeeded_count, batch_skipped_count,
+ batch_mismatched_count, batch_insuffperm_count,
+ batch_missing_count);
+
+ /* Commit this batch, and prepare for next batch */
+ CommitTransactionCommand();
+
+ if (batch_missing_count)
+ {
+ for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
+ {
+ LogicalRepSequenceInfo *seqinfo =
+ (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
+
+ /* If the sequence was not found on publisher, record it */
+ if (!seqinfo->found_on_pub)
+ missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
+ }
+ }
+
+ /*
+ * cur_batch_base_index is not incremented sequentially because some
+ * sequences may be missing, and the number of fetched rows may not
+ * match the batch size.
+ */
+ cur_batch_base_index += batch_size;
+ }
+
+ /* Report permission issues, mismatches, or missing sequences */
+ if (insuffperm_seqs_idx || mismatched_seqs_idx || missing_seqs_idx)
+ report_sequence_errors(insuffperm_seqs_idx, mismatched_seqs_idx,
+ missing_seqs_idx);
+}
+
+/*
+ * Identifies sequences that require synchronization and initiates the
+ * synchronization process.
+ */
+static void
+LogicalRepSyncSequences(void)
+{
+ char *err;
+ bool must_use_password;
+ Relation rel;
+ HeapTuple tup;
+ ScanKeyData skey[2];
+ SysScanDesc scan;
+ Oid subid = MyLogicalRepWorker->subid;
+ StringInfoData app_name;
+
+ StartTransactionCommand();
+
+ rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_subscription_rel_srsubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(subid));
+
+ ScanKeyInit(&skey[1],
+ Anum_pg_subscription_rel_srsubstate,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(SUBREL_STATE_INIT));
+
+ scan = systable_beginscan(rel, InvalidOid, false,
+ NULL, 2, skey);
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_subscription_rel subrel;
+ LogicalRepSequenceInfo *seq;
+ Relation sequence_rel;
+ MemoryContext oldctx;
+
+ CHECK_FOR_INTERRUPTS();
+
+ subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+ sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
+
+ /* Skip if sequence was dropped concurrently */
+ if (!sequence_rel)
+ continue;
+
+ /* Skip if the relation is not a sequence */
+ if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
+ continue;
+
+ /*
+ * Worker needs to process sequences across transaction boundary, so
+ * allocate them under long-lived context.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ seq = palloc0_object(LogicalRepSequenceInfo);
+ seq->localrelid = subrel->srrelid;
+ seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
+ seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
+ seqinfos = lappend(seqinfos, seq);
+
+ MemoryContextSwitchTo(oldctx);
+
+ table_close(sequence_rel, NoLock);
+ }
+
+ /* Cleanup */
+ systable_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ CommitTransactionCommand();
+
+ /*
+ * Exit early if no catalog entries found, likely due to concurrent drops.
+ */
+ if (!seqinfos)
+ return;
+
+ /* Is the use of a password mandatory? */
+ must_use_password = MySubscription->passwordrequired &&
+ !MySubscription->ownersuperuser;
+
+ initStringInfo(&app_name);
+ appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
+ MySubscription->oid, GetSystemIdentifier());
+
+ /*
+ * Establish the connection to the publisher for sequence synchronization.
+ */
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, true,
+ must_use_password,
+ app_name.data, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
+ MySubscription->name, err));
+
+ pfree(app_name.data);
+
+ copy_sequences(LogRepWorkerWalRcvConn);
+}
+
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if required.
+ *
+ * Note that we don't handle FATAL errors which are probably because of system
+ * resource error and are not repeatable.
+ */
+static void
+start_sequence_sync()
+{
+ Assert(am_sequencesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ LogicalRepSyncSequences();
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during sequence synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid,
+ WORKERTYPE_SEQUENCESYNC);
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+}
+
+/* Logical Replication sequencesync worker entry point */
+void
+SequenceSyncWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+
+ SetupApplyOrSyncWorker(worker_slot);
+
+ start_sequence_sync();
+
+ FinishSyncWorker();
+}
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index ae8c93859168..19f989410532 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -16,6 +16,7 @@
#include "catalog/pg_subscription_rel.h"
#include "pgstat.h"
+#include "replication/logicallauncher.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "utils/lsyscache.h"
@@ -48,6 +49,8 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE
pg_noreturn void
FinishSyncWorker(void)
{
+ Assert(am_sequencesync_worker() || am_tablesync_worker());
+
/*
* Commit any outstanding transaction. This is the usual case, unless
* there was nothing to do for the table.
@@ -61,16 +64,32 @@ FinishSyncWorker(void)
/* And flush all writes. */
XLogFlush(GetXLogWriteRecPtr());
- StartTransactionCommand();
- ereport(LOG,
- (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
- MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid))));
- CommitTransactionCommand();
+ if (am_sequencesync_worker())
+ {
+ ereport(LOG,
+ errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
+ MySubscription->name));
+
+ /*
+ * Find the leader apply worker and reset last_seqsync_start_time.
+ * This ensures that the apply worker can restart the sequence sync
+ * worker promptly whenever required.
+ */
+ logicalrep_reset_seqsync_start_time();
+ }
+ else
+ {
+ StartTransactionCommand();
+ ereport(LOG,
+ errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();
- /* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
- InvalidOid);
+ /* Find the leader apply worker and signal it. */
+ logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
+ InvalidOid);
+ }
/* Stop gracefully */
proc_exit(0);
@@ -86,7 +105,52 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
}
/*
- * Process possible state change(s) of relations that are being synchronized.
+ * Attempt to launch a sync worker for one or more sequences or a table, if
+ * a worker slot is available and the retry interval has elapsed.
+ *
+ * wtype: sync worker type.
+ * nsyncworkers: Number of currently running sync workers for the subscription.
+ * relid: InvalidOid for sequencesync worker, actual relid for tablesync
+ * worker.
+ * last_start_time: Pointer to the last start time of the worker.
+ */
+void
+launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
+ TimestampTz *last_start_time)
+{
+ TimestampTz now;
+
+ Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
+ (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
+
+ /* If there is a free sync worker slot, start a new sync worker */
+ if (nsyncworkers >= max_sync_workers_per_subscription)
+ return;
+
+ now = GetCurrentTimestamp();
+
+ if (!(*last_start_time) ||
+ TimestampDifferenceExceeds(*last_start_time, now,
+ wal_retrieve_retry_interval))
+ {
+ /*
+ * Set the last_start_time even if we fail to start the worker, so
+ * that we won't retry until wal_retrieve_retry_interval has elapsed.
+ */
+ *last_start_time = now;
+ (void) logicalrep_worker_launch(wtype,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ relid, DSM_HANDLE_INVALID, false);
+ }
+}
+
+/*
+ * Process possible state change(s) of relations that are being synchronized
+ * and start new tablesync workers for the newly added tables. Also, start a
+ * new sequencesync worker for the newly added sequences.
*/
void
ProcessSyncingRelations(XLogRecPtr current_lsn)
@@ -108,6 +172,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
case WORKERTYPE_APPLY:
ProcessSyncingTablesForApply(current_lsn);
+ ProcessSequencesForSync();
+ break;
+
+ case WORKERTYPE_SEQUENCESYNC:
+ /* Should never happen. */
+ elog(ERROR, "sequence synchronization worker is not expected to process relations");
break;
case WORKERTYPE_UNKNOWN:
@@ -117,17 +187,29 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
}
/*
- * Common code to fetch the up-to-date sync state info into the static lists.
+ * Common code to fetch the up-to-date sync state info for tables and sequences.
*
- * Returns true if subscription has 1 or more tables, else false.
+ * The pg_subscription_rel catalog is shared by tables and sequences. Changes
+ * to either sequences or tables can affect the validity of relation states, so
+ * we identify non-READY tables and non-READY sequences together to ensure
+ * consistency.
*
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
+ * has_pending_subtables: true if the subscription has one or more tables that
+ * are not in READY state, otherwise false.
+ * has_pending_subsequences: true if the subscription has one or more sequences
+ * that are not in READY state, otherwise false.
*/
-bool
-FetchRelationStates(bool *started_tx)
+void
+FetchRelationStates(bool *has_pending_subtables,
+ bool *has_pending_subsequences,
+ bool *started_tx)
{
+ /*
+ * has_subtables and has_subsequences_non_ready are declared as static,
+ * since the same value can be used until the system table is invalidated.
+ */
static bool has_subtables = false;
+ static bool has_subsequences_non_ready = false;
*started_tx = false;
@@ -135,10 +217,10 @@ FetchRelationStates(bool *started_tx)
{
MemoryContext oldctx;
List *rstates;
- ListCell *lc;
SubscriptionRelState *rstate;
relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
+ has_subsequences_non_ready = false;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
@@ -150,17 +232,23 @@ FetchRelationStates(bool *started_tx)
*started_tx = true;
}
- /* Fetch tables that are in non-ready state. */
- rstates = GetSubscriptionRelations(MySubscription->oid, true, false,
+ /* Fetch tables and sequences that are in non-READY state. */
+ rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
true);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- foreach(lc, rstates)
+ foreach_ptr(SubscriptionRelState, subrel, rstates)
{
- rstate = palloc(sizeof(SubscriptionRelState));
- memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
- table_states_not_ready = lappend(table_states_not_ready, rstate);
+ if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
+ has_subsequences_non_ready = true;
+ else
+ {
+ rstate = palloc(sizeof(SubscriptionRelState));
+ memcpy(rstate, subrel, sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready,
+ rstate);
+ }
}
MemoryContextSwitchTo(oldctx);
@@ -185,5 +273,9 @@ FetchRelationStates(bool *started_tx)
relation_states_validity = SYNC_RELATIONS_STATE_VALID;
}
- return has_subtables;
+ if (has_pending_subtables)
+ *has_pending_subtables = has_subtables;
+
+ if (has_pending_subsequences)
+ *has_pending_subsequences = has_subsequences_non_ready;
}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 58c98488d7b7..dcc6124cc730 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -374,14 +374,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
};
static HTAB *last_start_times = NULL;
ListCell *lc;
- bool started_tx = false;
+ bool started_tx;
bool should_exit = false;
Relation rel = NULL;
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
- FetchRelationStates(&started_tx);
+ FetchRelationStates(NULL, NULL, &started_tx);
/*
* Prepare a hash table for tracking last start times of workers, to avoid
@@ -415,6 +415,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
+
if (rstate->state == SUBREL_STATE_SYNCDONE)
{
/*
@@ -428,11 +436,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
/*
* Remove the tablesync origin tracking if exists.
@@ -552,43 +555,19 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
*/
int nsyncworkers =
logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ struct tablesync_start_time_mapping *hentry;
+ bool found;
/* Now safe to release the LWLock */
LWLockRelease(LogicalRepWorkerLock);
- /*
- * If there are free sync worker slot(s), start a new sync
- * worker for the table.
- */
- if (nsyncworkers < max_sync_workers_per_subscription)
- {
- TimestampTz now = GetCurrentTimestamp();
- struct tablesync_start_time_mapping *hentry;
- bool found;
+ hentry = hash_search(last_start_times, &rstate->relid,
+ HASH_ENTER, &found);
+ if (!found)
+ hentry->last_start_time = 0;
- hentry = hash_search(last_start_times, &rstate->relid,
- HASH_ENTER, &found);
-
- if (!found ||
- TimestampDifferenceExceeds(hentry->last_start_time, now,
- wal_retrieve_retry_interval))
- {
- /*
- * Set the last_start_time even if we fail to start
- * the worker, so that we won't retry until
- * wal_retrieve_retry_interval has elapsed.
- */
- hentry->last_start_time = now;
- (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
- MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid,
- DSM_HANDLE_INVALID,
- false);
- }
- }
+ launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
+ rstate->relid, &hentry->last_start_time);
}
}
}
@@ -1432,8 +1411,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
}
/*
- * Make sure that the copy command runs as the table owner, unless the
- * user has opted out of that behaviour.
+ * If the user did not opt to run as the owner of the subscription
+ * ('run_as_owner'), then copy the table as the owner of the table.
*/
run_as_owner = MySubscription->runasowner;
if (!run_as_owner)
@@ -1551,7 +1530,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, false);
+ pgstat_report_subscription_error(MySubscription->oid,
+ WORKERTYPE_TABLESYNC);
PG_RE_THROW();
}
@@ -1596,7 +1576,7 @@ run_tablesync_worker()
/* Logical Replication Tablesync worker entry point */
void
-TablesyncWorkerMain(Datum main_arg)
+TableSyncWorkerMain(Datum main_arg)
{
int worker_slot = DatumGetInt32(main_arg);
@@ -1618,11 +1598,11 @@ TablesyncWorkerMain(Datum main_arg)
bool
AllTablesyncsReady(void)
{
- bool started_tx = false;
- bool has_subrels = false;
+ bool started_tx;
+ bool has_tables;
/* We need up-to-date sync state info for subscription tables here. */
- has_subrels = FetchRelationStates(&started_tx);
+ FetchRelationStates(&has_tables, NULL, &started_tx);
if (started_tx)
{
@@ -1634,7 +1614,7 @@ AllTablesyncsReady(void)
* Return false when there are no tables in subscription or not all tables
* are in ready state; true otherwise.
*/
- return has_subrels && (table_states_not_ready == NIL);
+ return has_tables && (table_states_not_ready == NIL);
}
/*
@@ -1649,10 +1629,10 @@ bool
HasSubscriptionTablesCached(void)
{
bool started_tx;
- bool has_subrels;
+ bool has_tables;
/* We need up-to-date subscription tables info here */
- has_subrels = FetchRelationStates(&started_tx);
+ FetchRelationStates(&has_tables, NULL, &started_tx);
if (started_tx)
{
@@ -1660,7 +1640,7 @@ HasSubscriptionTablesCached(void)
pgstat_report_stat(true);
}
- return has_subrels;
+ return has_tables;
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7edd1c9cf060..8b89eddb0cc7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
(rel->state == SUBREL_STATE_SYNCDONE &&
rel->statelsn <= remote_final_lsn));
+ case WORKERTYPE_SEQUENCESYNC:
+ /* Should never happen. */
+ elog(ERROR, "sequence synchronization worker is not expected to apply changes");
+ break;
+
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
@@ -1243,7 +1248,10 @@ apply_handle_commit(StringInfo s)
apply_handle_commit_internal(&commit_data);
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1365,7 +1373,10 @@ apply_handle_prepare(StringInfo s)
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
/*
@@ -1421,7 +1432,10 @@ apply_handle_commit_prepared(StringInfo s)
store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
clear_subscription_skip_lsn(prepare_data.end_lsn);
@@ -1487,7 +1501,10 @@ apply_handle_rollback_prepared(StringInfo s)
store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(rollback_data.rollback_end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1622,7 +1639,10 @@ apply_handle_stream_prepare(StringInfo s)
pgstat_report_stat(false);
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
/*
@@ -2465,7 +2485,10 @@ apply_handle_stream_commit(StringInfo s)
break;
}
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -4137,7 +4160,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
AcceptInvalidationMessages();
maybe_reread_subscription();
- /* Process any table synchronization changes. */
+ /*
+ * Process any relations that are being synchronized in parallel
+ * and any newly added tables or sequences.
+ */
ProcessSyncingRelations(last_received);
}
@@ -5580,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos)
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ pgstat_report_subscription_error(MySubscription->oid,
+ MyLogicalRepWorker->type);
PG_RE_THROW();
}
@@ -5700,8 +5727,8 @@ run_apply_worker()
}
/*
- * Common initialization for leader apply worker, parallel apply worker and
- * tablesync worker.
+ * Common initialization for leader apply worker, parallel apply worker,
+ * tablesync worker and sequencesync worker.
*
* Initialize the database connection, in-memory subscription and necessary
* config options.
@@ -5812,6 +5839,10 @@ InitializeLogRepWorker(void)
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
MySubscription->name,
get_rel_name(MyLogicalRepWorker->relid))));
+ else if (am_sequencesync_worker())
+ ereport(LOG,
+ (errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
+ MySubscription->name)));
else
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\" has started",
@@ -5831,14 +5862,16 @@ replorigin_reset(int code, Datum arg)
replorigin_session_origin_timestamp = 0;
}
-/* Common function to setup the leader apply or tablesync worker. */
+/*
+ * Common function to setup the leader apply, tablesync and sequencesync worker.
+ */
void
SetupApplyOrSyncWorker(int worker_slot)
{
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
- Assert(am_tablesync_worker() || am_leader_apply_worker());
+ Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -5921,9 +5954,12 @@ DisableSubscriptionAndExit(void)
RESUME_INTERRUPTS();
- /* Report the worker failed during either table synchronization or apply */
+ /*
+ * Report the worker failed during either sequence synchronization or
+ * table synchronization or apply.
+ */
pgstat_report_subscription_error(MyLogicalRepWorker->subid,
- !am_tablesync_worker());
+ MyLogicalRepWorker->type);
/* Disable the subscription */
StartTransactionCommand();
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index f9a1c831a07e..35916772b9dc 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -17,6 +17,7 @@
#include "postgres.h"
+#include "replication/worker_internal.h"
#include "utils/pgstat_internal.h"
@@ -24,7 +25,7 @@
* Report a subscription error.
*/
void
-pgstat_report_subscription_error(Oid subid, bool is_apply_error)
+pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
{
PgStat_EntryRef *entry_ref;
PgStat_BackendSubEntry *pending;
@@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
InvalidOid, subid, NULL);
pending = entry_ref->pending;
- if (is_apply_error)
- pending->apply_error_count++;
- else
- pending->sync_error_count++;
+ switch (wtype)
+ {
+ case WORKERTYPE_APPLY:
+ pending->apply_error_count++;
+ break;
+
+ case WORKERTYPE_SEQUENCESYNC:
+ pending->seq_sync_error_count++;
+ break;
+
+ case WORKERTYPE_TABLESYNC:
+ pending->sync_error_count++;
+ break;
+
+ default:
+ /* Should never happen. */
+ Assert(0);
+ break;
+ }
}
/*
@@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
SUB_ACC(apply_error_count);
+ SUB_ACC(seq_sync_error_count);
SUB_ACC(sync_error_count);
for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
SUB_ACC(conflict_count[i]);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index a710508979e4..1521d6e2ab43 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2203,7 +2203,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2221,25 +2221,27 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "seq_sync_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@@ -2256,6 +2258,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
/* apply_error_count */
values[i++] = Int64GetDatum(subentry->apply_error_count);
+ /* seq_sync_error_count */
+ values[i++] = Int64GetDatum(subentry->seq_sync_error_count);
+
/* sync_error_count */
values[i++] = Int64GetDatum(subentry->sync_error_count);
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 25da769eb359..1128167c0251 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -2058,7 +2058,7 @@
},
{ name => 'max_sync_workers_per_subscription', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_SUBSCRIBERS',
- short_desc => 'Maximum number of table synchronization workers per subscription.',
+ short_desc => 'Maximum number of workers per subscription for synchronizing tables and sequences.',
variable => 'max_sync_workers_per_subscription',
boot_val => '2',
min => '0',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76b..5cf9e12fcb9a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -3433,7 +3433,7 @@
proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u',
prorettype => 'int8', proargtypes => 'regclass',
prosrc => 'pg_sequence_last_value' },
-{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump',
+{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump and sequence synchronization',
proname => 'pg_get_sequence_data', provolatile => 'v', proparallel => 'u',
prorettype => 'record', proargtypes => 'regclass',
proallargtypes => '{regclass,int8,bool,pg_lsn}', proargmodes => '{i,o,o,o}',
@@ -5704,9 +5704,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,seq_sync_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9f88498ecd3f..9508e553b52a 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -82,6 +82,29 @@ typedef struct SubscriptionRelState
char state;
} SubscriptionRelState;
+/*
+ * Holds local sequence identity and corresponding publisher values used during
+ * sequence synchronization.
+ */
+typedef struct LogicalRepSequenceInfo
+{
+ /* Sequence information retrieved from the local node */
+ char *seqname;
+ char *nspname;
+ Oid localrelid;
+
+ /* Sequence information retrieved from the publisher node */
+ XLogRecPtr page_lsn;
+ int64 last_value;
+ bool is_called;
+
+ /*
+ * True if the sequence identified by (nspname, seqname) exists on the
+ * publisher.
+ */
+ bool found_on_pub;
+} LogicalRepSequenceInfo;
+
extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn, bool retain_lock);
extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 9ac0b67683d3..46b4d89dd6ea 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -60,6 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
+extern void SetSequence(Oid relid, int64 next, bool is_called);
extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *record);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 7ae503e71a27..a0610bb3e316 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -16,6 +16,7 @@
#include "portability/instr_time.h"
#include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
#include "replication/conflict.h"
+#include "replication/worker_internal.h"
#include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */
#include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */
#include "utils/pgstat_kind.h"
@@ -108,6 +109,7 @@ typedef struct PgStat_FunctionCallUsage
typedef struct PgStat_BackendSubEntry
{
PgStat_Counter apply_error_count;
+ PgStat_Counter seq_sync_error_count;
PgStat_Counter sync_error_count;
PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
} PgStat_BackendSubEntry;
@@ -416,6 +418,7 @@ typedef struct PgStat_SLRUStats
typedef struct PgStat_StatSubEntry
{
PgStat_Counter apply_error_count;
+ PgStat_Counter seq_sync_error_count;
PgStat_Counter sync_error_count;
PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
TimestampTz stat_reset_timestamp;
@@ -769,7 +772,8 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
* Functions in pgstat_subscription.c
*/
-extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_error(Oid subid,
+ LogicalRepWorkerType wtype);
extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
extern void pgstat_create_subscription(Oid subid);
extern void pgstat_drop_subscription(Oid subid);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 88912606e4d5..56fa79b648e7 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
extern void ParallelApplyWorkerMain(Datum main_arg);
-extern void TablesyncWorkerMain(Datum main_arg);
+extern void TableSyncWorkerMain(Datum main_arg);
+extern void SequenceSyncWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
extern bool IsLogicalParallelApplyWorker(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index e23fa9a4514e..f081619f1513 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType
{
WORKERTYPE_UNKNOWN = 0,
WORKERTYPE_TABLESYNC,
+ WORKERTYPE_SEQUENCESYNC,
WORKERTYPE_APPLY,
WORKERTYPE_PARALLEL_APPLY,
} LogicalRepWorkerType;
@@ -106,6 +107,8 @@ typedef struct LogicalRepWorker
TimestampTz last_recv_time;
XLogRecPtr reply_lsn;
TimestampTz reply_time;
+
+ TimestampTz last_seqsync_start_time;
} LogicalRepWorker;
/*
@@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_reset_seqsync_start_time(void);
extern int logicalrep_sync_worker_count(Oid subid);
extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
@@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
+extern void ProcessSequencesForSync(void);
pg_noreturn extern void FinishSyncWorker(void);
extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
+extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
+ Oid relid, TimestampTz *last_start_time);
extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
-extern bool FetchRelationStates(bool *started_tx);
+extern void FetchRelationStates(bool *has_pending_subtables,
+ bool *has_pending_sequences, bool *started_tx);
extern void stream_start_internal(TransactionId xid, bool first_segment);
extern void stream_stop_internal(TransactionId xid);
@@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->in_use && \
+#define isTableSyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+#define isSequenceSyncWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_SEQUENCESYNC)
static inline bool
am_tablesync_worker(void)
{
- return isTablesyncWorker(MyLogicalRepWorker);
+ return isTableSyncWorker(MyLogicalRepWorker);
+}
+
+static inline bool
+am_sequencesync_worker(void)
+{
+ return isSequenceSyncWorker(MyLogicalRepWorker);
}
static inline bool
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 77e25ca029e5..fe20f613c3af 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2191,6 +2191,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
pg_stat_subscription_stats| SELECT ss.subid,
s.subname,
ss.apply_error_count,
+ ss.seq_sync_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,
@@ -2202,7 +2203,7 @@ pg_stat_subscription_stats| SELECT ss.subid,
ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, seq_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index 00a1c2fcd48c..23f3511f9a4f 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -21,7 +21,8 @@
sub create_sub_pub_w_errors
{
- my ($node_publisher, $node_subscriber, $db, $table_name) = @_;
+ my ($node_publisher, $node_subscriber, $db, $table_name, $sequence_name)
+ = @_;
# Initial table setup on both publisher and subscriber. On subscriber we
# create the same tables but with primary keys. Also, insert some data that
# will conflict with the data replicated from publisher later.
@@ -32,6 +33,7 @@ sub create_sub_pub_w_errors
CREATE TABLE $table_name(a int);
ALTER TABLE $table_name REPLICA IDENTITY FULL;
INSERT INTO $table_name VALUES (1);
+ CREATE SEQUENCE $sequence_name;
COMMIT;
]);
$node_subscriber->safe_psql(
@@ -40,45 +42,56 @@ sub create_sub_pub_w_errors
BEGIN;
CREATE TABLE $table_name(a int primary key);
INSERT INTO $table_name VALUES (1);
+ CREATE SEQUENCE $sequence_name INCREMENT BY 10;
COMMIT;
]);
# Set up publication.
my $pub_name = $table_name . '_pub';
+ my $pub_seq_name = $sequence_name . '_pub';
my $publisher_connstr = $node_publisher->connstr . qq( dbname=$db);
- $node_publisher->safe_psql($db,
- qq(CREATE PUBLICATION $pub_name FOR TABLE $table_name));
+ $node_publisher->safe_psql(
+ $db,
+ qq[
+ CREATE PUBLICATION $pub_name FOR TABLE $table_name;
+ CREATE PUBLICATION $pub_seq_name FOR ALL SEQUENCES;
+ ]);
# Create subscription. The tablesync for table on subscription will enter into
- # infinite error loop due to violating the unique constraint.
+ # infinite error loop due to violating the unique constraint. The sequencesync
+ # will also fail due to different sequence increment values on publisher and
+ # subscriber.
my $sub_name = $table_name . '_sub';
$node_subscriber->safe_psql($db,
- qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name)
+ qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name, $pub_seq_name)
);
$node_publisher->wait_for_catchup($sub_name);
- # Wait for the tablesync error to be reported.
+ # Wait for the tablesync and sequencesync error to be reported.
$node_subscriber->poll_query_until(
$db,
qq[
- SELECT sync_error_count > 0
- FROM pg_stat_subscription_stats
- WHERE subname = '$sub_name'
+ SELECT count(1) = 1 FROM pg_stat_subscription_stats
+ WHERE subname = '$sub_name' and seq_sync_error_count > 0 and sync_error_count > 0
])
or die
qq(Timed out while waiting for tablesync errors for subscription '$sub_name');
+ # Change the sequence start value on the subscriber so that it doesn't error out.
+ $node_subscriber->safe_psql($db,
+ qq(ALTER SEQUENCE $sequence_name INCREMENT 1));
+
# Truncate test_tab1 so that tablesync worker can continue.
$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
- # Wait for initial tablesync to finish.
+ # Wait for initial sync to finish.
$node_subscriber->poll_query_until(
$db,
qq[
- SELECT count(1) = 1 FROM pg_subscription_rel
- WHERE srrelid = '$table_name'::regclass AND srsubstate in ('r', 's')
+ SELECT count(1) = 2 FROM pg_subscription_rel
+ WHERE srrelid IN ('$table_name'::regclass, '$sequence_name'::regclass) AND srsubstate in ('r', 's')
])
or die
qq(Timed out while waiting for subscriber to synchronize data for table '$table_name'.);
@@ -136,14 +149,17 @@ sub create_sub_pub_w_errors
# Create the publication and subscription with sync and apply errors
my $table1_name = 'test_tab1';
+my $sequence1_name = 'test_seq1';
my ($pub1_name, $sub1_name) =
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
- $table1_name);
+ $table1_name, $sequence1_name);
-# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
+# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset
+# timestamp is NULL.
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
+ seq_sync_error_count > 0,
sync_error_count > 0,
confl_insert_exists > 0,
confl_delete_missing > 0,
@@ -151,8 +167,8 @@ sub create_sub_pub_w_errors
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t|t|t),
- qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+ qq(t|t|t|t|t|t),
+ qq(Check that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
);
# Reset a single subscription
@@ -160,10 +176,12 @@ sub create_sub_pub_w_errors
qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);
-# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
+# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and
+# stats_reset timestamp is not NULL.
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
+ seq_sync_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
@@ -171,8 +189,8 @@ sub create_sub_pub_w_errors
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t|t|t),
- qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+ qq(t|t|t|t|t|t),
+ qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
);
# Get reset timestamp
@@ -198,14 +216,17 @@ sub create_sub_pub_w_errors
# Make second subscription and publication
my $table2_name = 'test_tab2';
+my $sequence2_name = 'test_seq2';
my ($pub2_name, $sub2_name) =
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
- $table2_name);
+ $table2_name, $sequence2_name);
-# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
+# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0
+# and stats_reset timestamp is NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
+ seq_sync_error_count > 0,
sync_error_count > 0,
confl_insert_exists > 0,
confl_delete_missing > 0,
@@ -213,18 +234,20 @@ sub create_sub_pub_w_errors
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
- qq(t|t|t|t|t),
- qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
+ qq(t|t|t|t|t|t),
+ qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
);
# Reset all subscriptions
$node_subscriber->safe_psql($db,
qq(SELECT pg_stat_reset_subscription_stats(NULL)));
-# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
+# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and
+# stats_reset timestamp is not NULL.
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
+ seq_sync_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
@@ -232,13 +255,14 @@ sub create_sub_pub_w_errors
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t|t|t),
- qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+ qq(t|t|t|t|t|t),
+ qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
);
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
+ seq_sync_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
@@ -246,8 +270,8 @@ sub create_sub_pub_w_errors
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
- qq(t|t|t|t|t),
- qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+ qq(t|t|t|t|t|t),
+ qq(Confirm that apply errors, sequencesync errors, tablesync errors, errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
);
$reset_time1 = $node_subscriber->safe_psql($db,
diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl
index 557fc91c017b..ef40ca979b1e 100644
--- a/src/test/subscription/t/036_sequences.pl
+++ b/src/test/subscription/t/036_sequences.pl
@@ -1,7 +1,7 @@
# Copyright (c) 2025, PostgreSQL Global Development Group
-# This tests that sequences are registered to be synced to the subscriber
+# This tests that sequences are synced correctly to the subscriber
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
@@ -14,6 +14,7 @@
# Avoid checkpoint during the test, otherwise, extra values will be fetched for
# the sequences which will cause the test to fail randomly.
$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'checkpoint_timeout = 1h');
$node_publisher->start;
# Initialize subscriber node
@@ -28,7 +29,14 @@
);
$node_publisher->safe_psql('postgres', $ddl);
-# Setup the same structure on the subscriber
+# Setup the same structure on the subscriber, plus some extra sequences that
+# we'll create on the publisher later
+$ddl = qq(
+ CREATE TABLE regress_seq_test (v BIGINT);
+ CREATE SEQUENCE regress_s1;
+ CREATE SEQUENCE regress_s2;
+ CREATE SEQUENCE regress_s3;
+);
$node_subscriber->safe_psql('postgres', $ddl);
# Insert initial test data
@@ -46,10 +54,165 @@
"CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub"
);
-# Confirm sequences can be listed in pg_subscription_rel
-my $result = $node_subscriber->safe_psql('postgres',
- "SELECT relname, srsubstate FROM pg_class, pg_subscription_rel WHERE oid = srrelid"
+# Wait for initial sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the initial data on subscriber
+my $result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '100|0|t', 'initial test data replicated');
+
+##########
+## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new
+# sequences of the publisher, but changes to existing sequences should
+# not be synced.
+##########
+
+# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1'
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ CREATE SEQUENCE regress_s2;
+ INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100);
+
+ -- Existing sequence
+ INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100);
+));
+
+# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION;
+));
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '200|31|t', 'Check sequence value in the publisher');
+
+# Check - existing sequence ('regress_s1') is not synced
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '100|0|t', 'REFRESH PUBLICATION will not sync existing sequence');
+
+# Check - newly published sequence ('regress_s2') is synced
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s2;
+));
+is($result, '100|0|t',
+ 'REFRESH PUBLICATION will sync newly published sequence');
+
+##########
+# Test: REFRESH SEQUENCES and REFRESH PUBLICATION (copy_data = off)
+#
+# 1. ALTER SUBSCRIPTION ... REFRESH SEQUENCES should re-synchronize all
+# existing sequences, but not synchronize newly added ones.
+# 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = off) should
+# also not update sequence values for newly added sequences.
+##########
+
+# Create a new sequence 'regress_s3', and update the existing sequence
+# 'regress_s2'.
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ CREATE SEQUENCE regress_s3;
+ INSERT INTO regress_seq_test SELECT nextval('regress_s3') FROM generate_series(1,100);
+
+ -- Existing sequence
+ INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100);
+));
+
+# 1. Do ALTER SUBSCRIPTION ... REFRESH SEQUENCES
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;
+));
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check - existing sequences ('regress_s1' and 'regress_s2') are synced
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '200|0|t', 'REFRESH SEQUENCES will sync existing sequences');
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s2;
+));
+is($result, '200|0|t', 'REFRESH SEQUENCES will sync existing sequences');
+
+# Check - newly published sequence ('regress_s3') is not synced
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s3;
+));
+is($result, '1|0|f',
+ 'REFRESH SEQUENCES will not sync newly published sequence');
+
+# 2. Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data as false
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION WITH (copy_data = false);
+));
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check - newly published sequence ('regress_s3') is not synced when
+# (copy_data = off).
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s3;
+));
+is($result, '1|0|f',
+ 'REFRESH PUBLICATION will not sync newly published sequence with copy_data as off'
);
-is($result, 'regress_s1|i', "Sequence can be in pg_subscription_rel catalog");
+
+##########
+# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should report an error when:
+# a) sequence definitions differ between the publisher and subscriber, or
+# b) a sequence is missing on the publisher.
+##########
+
+# Create a new sequence 'regress_s4' whose START value is not the same in the
+# publisher and subscriber.
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ CREATE SEQUENCE regress_s4 START 1 INCREMENT 2;
+));
+
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ CREATE SEQUENCE regress_s4 START 10 INCREMENT 2;
+));
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION");
+
+# Verify that an error is logged for parameter differences on sequence
+# ('regress_s4').
+$node_subscriber->wait_for_log(
+ qr/WARNING: ( [A-Z0-9]+:)? mismatched or renamed sequence on subscriber \("public.regress_s4"\)\n.*ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"/,
+ $log_offset);
+
+# Verify that an error is logged for the missing sequence ('regress_s4').
+$node_publisher->safe_psql('postgres', qq(DROP SEQUENCE regress_s4;));
+
+$node_subscriber->wait_for_log(
+ qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)\n.*ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"/,
+ $log_offset);
done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 018b5919cf66..2ca7b75af579 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -526,6 +526,7 @@ CopyMethod
CopyMultiInsertBuffer
CopyMultiInsertInfo
CopyOnErrorChoice
+CopySeqResult
CopySource
CopyStmt
CopyToRoutine
@@ -1629,6 +1630,7 @@ LogicalRepRelId
LogicalRepRelMapEntry
LogicalRepRelation
LogicalRepRollbackPreparedTxnData
+LogicalRepSequenceInfo
LogicalRepStreamAbortData
LogicalRepTupleData
LogicalRepTyp