diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 6c8a0f173c97..2fc634429802 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6568,7 +6568,7 @@ SCRAM-SHA-256$<iteration count>:&l (references pg_class.oid) - Reference to relation + Reference to table or sequence diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 06d1e4403b55..f5cbc68b938e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5191,9 +5191,9 @@ ANY num_sync ( num_sync ( num_sync ( + + pg_get_sequence_data + + pg_get_sequence_data ( regclass ) + record + ( last_value bigint, + is_called bool, + page_lsn pg_lsn ) + + + Returns information about the sequence. last_value + indicates last sequence value set in sequence by nextval or setval, + is_called indicates whether the sequence has been + used, and page_lsn is the LSN corresponding to the + most recent WAL record that modified this sequence relation. + + + This function requires USAGE + or SELECT privilege on the sequence. + + diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index b01f5e998b2c..9e78c2f0465b 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -113,7 +113,9 @@ Publications may currently only contain tables or sequences. Objects must be added explicitly, except when a publication is created using FOR TABLES IN SCHEMA, FOR ALL TABLES, - or FOR ALL SEQUENCES. + or FOR ALL SEQUENCES. Unlike tables, sequences can be + synchronized at any time. For more information, see + . @@ -1745,6 +1747,194 @@ Publications: + + Replicating Sequences + + + To synchronize sequences from a publisher to a subscriber, first publish + them using + CREATE PUBLICATION ... FOR ALL SEQUENCES and then + on the subscriber: + + + + + + + use CREATE SUBSCRIPTION + to initially synchronize the published sequences. + + + + + use + ALTER SUBSCRIPTION ... REFRESH PUBLICATION + to synchronize only newly added sequences. + + + + + use + ALTER SUBSCRIPTION ... REFRESH SEQUENCES + to re-synchronize all sequences currently known to the subscription. + + + + + + + A new sequence synchronization worker will be started + after executing any of the above subscriber commands, and will exit once the + sequences are synchronized. + + + The ability to launch a sequence synchronization worker is limited by the + + max_sync_workers_per_subscription + configuration. + + + + Sequence Definition Mismatches + + The sequence synchronization worker validates that sequence definitions + match between publisher and subscriber. If mismatches exist, the worker + logs an error identifying them and exits. The apply worker continues + respawning the sequence synchronization worker until synchronization + succeeds. See also + wal_retrieve_retry_interval. + + + To resolve this, use + ALTER SEQUENCE + to align the subscriber's sequence parameters with those of the publisher. + + + + + Refreshing Stale Sequences + + Subscriber side sequence values may frequently become out of sync due to + updates on the publisher. + + + Subscriber sequence values drift out of sync as the publisher advances + them. Compare values between publisher and subscriber, then run + + ALTER SUBSCRIPTION ... REFRESH SEQUENCES to + resynchronize if necessary. + + + + + Examples + + + Create some sequences on the publisher. + +/* pub # */ CREATE SEQUENCE s1 START WITH 10 INCREMENT BY 1; +/* pub # */ CREATE SEQUENCE s2 START WITH 100 INCREMENT BY 10; + + + + Create the same sequences on the subscriber. + +/* sub # */ CREATE SEQUENCE s1 START WITH 10 INCREMENT BY 1 +/* sub # */ CREATE SEQUENCE s2 START WITH 100 INCREMENT BY 10; + + + + Update the sequences at the publisher side a few times. + +/* pub # */ SELECT nextval('s1'); + nextval +--------- + 10 +(1 row) +/* pub # */ SELECT nextval('s1'); + nextval +--------- + 11 +(1 row) +/* pub # */ SELECT nextval('s2'); + nextval +--------- + 100 +(1 row) +/* pub # */ SELECT nextval('s2'); + nextval +--------- + 110 +(1 row) + + + + Create a publication for the sequences. + +/* pub # */ CREATE PUBLICATION pub1 FOR ALL SEQUENCES; + + + + Subscribe to the publication. + +/* sub # */ CREATE SUBSCRIPTION sub1 +/* sub - */ CONNECTION 'host=localhost dbname=test_pub application_name=sub1' +/* sub - */ PUBLICATION pub1; + + + + Observe that initial sequence values are synchronized. + +/* sub # */ SELECT * FROM s1; + last_value | log_cnt | is_called +------------+---------+----------- + 11 | 31 | t +(1 row) + +/* sub # */ SELECT * FROM s2; + last_value | log_cnt | is_called +------------+---------+----------- + 110 | 31 | t +(1 row) + + + + Update the sequences at the publisher side. + +/* pub # */ SELECT nextval('s1'); + nextval +--------- + 12 +(1 row) +/* pub # */ SELECT nextval('s2'); + nextval +--------- + 120 +(1 row) + + + + Re-synchronize all sequences known to the subscriber using + + ALTER SUBSCRIPTION ... REFRESH SEQUENCES. + +/* sub # */ ALTER SUBSCRIPTION sub1 REFRESH SEQUENCES; + +/* sub # */ SELECT * FROM s1; + last_value | log_cnt | is_called +------------+---------+----------- + 12 | 30 | t +(1 row) + +/* sub # */ SELECT * FROM s2 + last_value | log_cnt | is_called +------------+---------+----------- + 120 | 30 | t +(1 row) + + + + Conflicts @@ -2090,16 +2280,19 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER - Sequence data is not replicated. The data in serial or identity columns - backed by sequences will of course be replicated as part of the table, - but the sequence itself would still show the start value on the - subscriber. If the subscriber is used as a read-only database, then this - should typically not be a problem. If, however, some kind of switchover - or failover to the subscriber database is intended, then the sequences - would need to be updated to the latest values, either by copying the - current data from the publisher (perhaps - using pg_dump) or by determining a sufficiently high - value from the tables themselves. + Incremental sequence changes are not replicated. Although the data in + serial or identity columns backed by sequences will be replicated as part + of the table, the sequences themselves do not replicate ongoing changes. + On the subscriber, a sequence will retain the last value it synchronized + from the publisher. If the subscriber is used as a read-only database, + then this should typically not be a problem. If, however, some kind of + switchover or failover to the subscriber database is intended, then the + sequences would need to be updated to the latest values, either by + executing + ALTER SUBSCRIPTION ... REFRESH SEQUENCES + or by copying the current data from the publisher (perhaps using + pg_dump) or by determining a sufficiently high value + from the tables themselves. @@ -2423,8 +2616,8 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER max_logical_replication_workers must be set to at least the number of subscriptions (for leader apply - workers), plus some reserve for the table synchronization workers and - parallel apply workers. + workers), plus some reserve for the parallel apply workers, and + table/sequence synchronization workers. @@ -2437,8 +2630,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER max_sync_workers_per_subscription - controls the amount of parallelism of the initial data copy during the - subscription initialization or when new tables are added. + controls how many tables can be synchronized in parallel during + subscription initialization or when new tables are added. One additional + worker is also needed for sequence synchronization. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index f3bf527d5b4b..1dc0024ab921 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2045,8 +2045,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage Type of the subscription worker process. Possible types are - apply, parallel apply, and - table synchronization. + apply, parallel apply, + table synchronization, and + sequence synchronization. @@ -2192,6 +2193,15 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + sequence_sync_error_count bigint + + + Number of times an error occurred during the sequence synchronization + + + sync_error_count bigint diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 8ab3b7fbd377..27c06439f4fd 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -195,6 +195,12 @@ ALTER SUBSCRIPTION name RENAME TO < use ALTER SUBSCRIPTION ... REFRESH SEQUENCES. + + See for recommendations on how + to handle any warnings about sequence definition differences between + the publisher and the subscriber, which might occur when + copy_data = true. + See for details of how copy_data = true can interact with the @@ -225,6 +231,15 @@ ALTER SUBSCRIPTION name RENAME TO < data for all currently subscribed sequences. It does not add or remove sequences from the subscription to match the publication. + + See for + recommendations on how to handle any warnings about sequence definition + differences between the publisher and the subscriber. + + + See for recommendations on how to + identify and handle out-of-sync sequences. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index ed82cf1809e5..d2ca3165f8a1 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -228,7 +228,7 @@ CREATE SUBSCRIPTION subscription_name for more about send/receive - functions). + functions). This parameter has no effect for sequences. @@ -265,6 +265,12 @@ CREATE SUBSCRIPTION subscription_namecopy_data = true can interact with the origin parameter. + + See + for recommendations on how to handle any warnings about sequence + definition differences between the publisher and the subscriber, + which might occur when copy_data = true. + @@ -280,6 +286,7 @@ CREATE SUBSCRIPTION subscription_name @@ -310,7 +317,8 @@ CREATE SUBSCRIPTION subscription_name setting within this subscription's apply worker processes. The default value - is off. + is off. This parameter has no effect for + sequences. @@ -340,7 +348,8 @@ CREATE SUBSCRIPTION subscription_name Specifies whether two-phase commit is enabled for this subscription. - The default is false. + The default is false. This parameter has no effect + for sequences. @@ -417,6 +426,7 @@ CREATE SUBSCRIPTION subscription_nameorigin to any means that the publisher sends changes regardless of their origin. The default is any. + This parameter has no effect for sequences. See for details of how @@ -449,7 +459,8 @@ CREATE SUBSCRIPTION subscription_name is enabled, and a physical replication slot named pg_conflict_detection is created on the subscriber to prevent the information for detecting - conflicts from being removed. + conflicts from being removed. This parameter has no effect for + sequences. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 15b233a37d8d..1945627ed88e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -354,7 +354,7 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); if (!HeapTupleIsValid(tup)) - elog(ERROR, "subscription table %u in subscription %u does not exist", + elog(ERROR, "subscription relation %u in subscription %u does not exist", relid, subid); /* Update the tuple. */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index dec8df4f8ee6..059e8778ca7c 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1415,6 +1415,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.subid, s.subname, ss.apply_error_count, + ss.seq_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index c23dee5231c0..8d671b7a29d6 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -112,7 +112,6 @@ static void init_params(ParseState *pstate, List *options, bool for_identity, bool *is_called, bool *need_seq_rewrite, List **owned_by); -static void do_setval(Oid relid, int64 next, bool iscalled); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); @@ -954,8 +953,8 @@ lastval(PG_FUNCTION_ARGS) * it is the only way to clear the is_called flag in an existing * sequence. */ -static void -do_setval(Oid relid, int64 next, bool iscalled) +void +SetSequence(Oid relid, int64 next, bool iscalled) { SeqTable elm; Relation seqrel; @@ -1056,7 +1055,7 @@ do_setval(Oid relid, int64 next, bool iscalled) /* * Implement the 2 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval_oid(PG_FUNCTION_ARGS) @@ -1064,14 +1063,14 @@ setval_oid(PG_FUNCTION_ARGS) Oid relid = PG_GETARG_OID(0); int64 next = PG_GETARG_INT64(1); - do_setval(relid, next, true); + SetSequence(relid, next, true); PG_RETURN_INT64(next); } /* * Implement the 3 arg setval procedure. - * See do_setval for discussion. + * See SetSequence for discussion. */ Datum setval3_oid(PG_FUNCTION_ARGS) @@ -1080,7 +1079,7 @@ setval3_oid(PG_FUNCTION_ARGS) int64 next = PG_GETARG_INT64(1); bool iscalled = PG_GETARG_BOOL(2); - do_setval(relid, next, iscalled); + SetSequence(relid, next, iscalled); PG_RETURN_INT64(next); } @@ -1797,8 +1796,9 @@ pg_sequence_parameters(PG_FUNCTION_ARGS) /* * Return the sequence tuple along with its page LSN. * - * This is primarily intended for use by pg_dump to gather sequence data - * without needing to individually query each sequence relation. + * This is primarily used by pg_dump to efficiently collect sequence data + * without querying each sequence individually, and is also leveraged by + * logical replication while synchronizing sequences. */ Datum pg_get_sequence_data(PG_FUNCTION_ARGS) diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 1ad65c237c34..142a02eb5e95 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,7 +131,10 @@ static const struct "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, { - "TablesyncWorkerMain", TablesyncWorkerMain + "TableSyncWorkerMain", TableSyncWorkerMain + }, + { + "SequenceSyncWorkerMain", SequenceSyncWorkerMain } }; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c62c8c67521c..c719af1f8a94 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -26,6 +26,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + sequencesync.o \ slotsync.o \ snapbuild.o \ syncutils.o \ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 95b5cae9a552..bb49ad17dc7e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -248,9 +248,10 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * Walks the workers array and searches for one that matches given worker type, * subscription id, and relation id. * - * For apply workers, the relid should be set to InvalidOid, as they manage - * changes across all tables. For table sync workers, the relid should be set - * to the OID of the relation being synchronized. + * For apply workers and sequencesync workers, the relid should be set to + * InvalidOid, as they manage changes across all tables. For tablesync + * workers, the relid should be set to the OID of the relation being + * synchronized. */ LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, @@ -334,6 +335,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -422,7 +424,8 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -478,6 +481,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, TIMESTAMP_NOBEGIN(worker->last_recv_time); worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); + worker->last_seqsync_start_time = 0; /* Before releasing lock, remember generation for future identification. */ generation = worker->generation; @@ -511,8 +515,16 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -848,6 +860,33 @@ logicalrep_launcher_onexit(int code, Datum arg) LogicalRepCtx->launcher_pid = 0; } +/* + * Reset the last_seqsync_start_time of the sequencesync worker in the + * subscription's apply worker. + * + * Note that this value is not stored in the sequencesync worker, because that + * has finished already and is about to exit. + */ +void +logicalrep_reset_seqsync_start_time(void) +{ + LogicalRepWorker *worker; + + /* + * The apply worker can't access last_seqsync_start_time concurrently, so + * it is okay to use SHARED lock here. See ProcessSequencesForSync(). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid, + true); + if (worker) + worker->last_seqsync_start_time = 0; + + LWLockRelease(LogicalRepWorkerLock); +} + /* * Cleanup function. * @@ -896,7 +935,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w))) res++; } @@ -1610,7 +1649,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; @@ -1650,6 +1689,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_PARALLEL_APPLY: values[9] = CStringGetTextDatum("parallel apply"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 9283e996ef4a..a2268d8361ee 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -12,6 +12,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'sequencesync.c', 'slotsync.c', 'snapbuild.c', 'syncutils.c', diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c new file mode 100644 index 000000000000..22e3018f1eab --- /dev/null +++ b/src/backend/replication/logical/sequencesync.c @@ -0,0 +1,733 @@ +/*------------------------------------------------------------------------- + * sequencesync.c + * PostgreSQL logical replication: sequence synchronization + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/sequencesync.c + * + * NOTES + * This file contains code for sequence synchronization for + * logical replication. + * + * Sequences requiring synchronization are tracked in the pg_subscription_rel + * catalog. + * + * Sequences to be synchronized will be added with state INIT when either of + * the following commands is executed: + * CREATE SUBSCRIPTION + * ALTER SUBSCRIPTION ... REFRESH PUBLICATION + * + * Executing the following command resets all sequences in the subscription to + * state INIT, triggering re-synchronization: + * ALTER SUBSCRIPTION ... REFRESH SEQUENCES + * + * The apply worker periodically scans pg_subscription_rel for sequences in + * INIT state. When such sequences are found, it spawns a sequencesync worker + * to handle synchronization. + * + * A single sequencesync worker is responsible for synchronizing all sequences. + * It begins by retrieving the list of sequences that are flagged for + * synchronization, i.e., those in the INIT state. These sequences are then + * processed in batches, allowing multiple entries to be synchronized within a + * single transaction. The worker fetches the current sequence values and page + * LSNs from the remote publisher, updates the corresponding sequences on the + * local subscriber, and finally marks each sequence as READY upon successful + * synchronization. + * + * Sequence state transitions follow this pattern: + * INIT → READY + * + * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH + * sequences are synchronized per transaction. The locks on the sequence + * relation will be periodically released at each transaction commit. + * + * XXX: We didn't choose launcher process to maintain the launch of sequencesync + * worker as it didn't have database connection to access the sequences from the + * pg_subscription_rel system catalog that need to be synchronized. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "catalog/pg_sequence.h" +#include "catalog/pg_subscription_rel.h" +#include "commands/sequence.h" +#include "pgstat.h" +#include "postmaster/interrupt.h" +#include "replication/logicalworker.h" +#include "replication/worker_internal.h" +#include "utils/acl.h" +#include "utils/fmgroids.h" +#include "utils/guc.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" +#include "utils/syscache.h" +#include "utils/usercontext.h" + +#define REMOTE_SEQ_COL_COUNT 10 + +typedef enum CopySeqResult +{ + COPYSEQ_SUCCESS, + COPYSEQ_MISMATCH, + COPYSEQ_INSUFFICIENT_PERM, + COPYSEQ_SKIPPED +} CopySeqResult; + +static List *seqinfos = NIL; + +/* + * Apply worker determines if sequence synchronization is needed. + * + * Start a sequencesync worker if one is not already running. The active + * sequencesync worker will handle all pending sequence synchronization. If any + * sequences remain unsynchronized after it exits, a new worker can be started + * in the next iteration. + */ +void +ProcessSequencesForSync(void) +{ + LogicalRepWorker *sequencesync_worker; + int nsyncworkers; + bool has_pending_sequences; + bool started_tx; + + FetchRelationStates(NULL, &has_pending_sequences, &started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + if (!has_pending_sequences) + return; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Check if there is a sequencesync worker already running? */ + sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC, + MyLogicalRepWorker->subid, + InvalidOid, true); + if (sequencesync_worker) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * Count running sync workers for this subscription, while we have the + * lock. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + LWLockRelease(LogicalRepWorkerLock); + + /* + * It is okay to read/update last_seqsync_start_time here in apply worker + * as we have already ensured that sync worker doesn't exist. + */ + launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid, + &MyLogicalRepWorker->last_seqsync_start_time); +} + +/* + * get_sequences_string + * + * Build a comma-separated string of schema-qualified sequence names + * for the given list of sequence indexes. + */ +static void +get_sequences_string(List *seqindexes, StringInfo buf) +{ + resetStringInfo(buf); + foreach_int(seqidx, seqindexes) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx); + + if (buf->len > 0) + appendStringInfoString(buf, ", "); + + appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname); + } +} + +/* + * report_sequence_errors + * + * Report discrepancies found during sequence synchronization between + * the publisher and subscriber. Emits warnings for: + * a) insufficient privileges + * b) mismatched definitions or concurrent rename + * c) missing sequences on the subscriber + * Then raises an ERROR to indicate synchronization failure. + */ +static void +report_sequence_errors(List *insuffperm_seqs_idx, List *mismatched_seqs_idx, + List *missing_seqs_idx) +{ + StringInfo seqstr = makeStringInfo(); + + if (insuffperm_seqs_idx) + { + get_sequences_string(insuffperm_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("insufficient privileges on sequence (%s)", + "insufficient privileges on sequences (%s)", + list_length(insuffperm_seqs_idx), + seqstr->data)); + } + + if (mismatched_seqs_idx) + { + get_sequences_string(mismatched_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("mismatched or renamed sequence on subscriber (%s)", + "mismatched or renamed sequences on subscriber (%s)", + list_length(mismatched_seqs_idx), + seqstr->data)); + } + + if (missing_seqs_idx) + { + get_sequences_string(missing_seqs_idx, seqstr); + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_plural("missing sequence on publisher (%s)", + "missing sequences on publisher (%s)", + list_length(missing_seqs_idx), + seqstr->data)); + } + + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication sequence synchronization failed for subscription \"%s\"", + MySubscription->name)); +} + +/* + * get_and_validate_seq_info + * + * Extracts remote sequence information from the tuple slot received from the + * publisher, and validates it against the corresponding local sequence + * definition. + */ +static CopySeqResult +get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, + LogicalRepSequenceInfo **seqinfo, int *seqidx) +{ + bool isnull; + int col = 0; + Oid remote_typid; + int64 remote_start; + int64 remote_increment; + int64 remote_min; + int64 remote_max; + bool remote_cycle; + CopySeqResult result = COPYSEQ_SUCCESS; + HeapTuple tup; + Form_pg_sequence local_seq; + LogicalRepSequenceInfo *seqinfo_local; + + *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Identify the corresponding local sequence for the given index. */ + *seqinfo = seqinfo_local = + (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx); + + seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull)); + Assert(!isnull); + + /* Sanity check */ + Assert(col == REMOTE_SEQ_COL_COUNT); + + seqinfo_local->found_on_pub = true; + + *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock); + + /* Sequence was concurrently dropped? */ + if (!*sequence_rel) + return COPYSEQ_SKIPPED; + + tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid)); + + /* Sequence was concurrently dropped? */ + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for sequence %u", + seqinfo_local->localrelid); + + local_seq = (Form_pg_sequence) GETSTRUCT(tup); + + /* Sequence parameters for remote/local are the same? */ + if (local_seq->seqtypid != remote_typid || + local_seq->seqstart != remote_start || + local_seq->seqincrement != remote_increment || + local_seq->seqmin != remote_min || + local_seq->seqmax != remote_max || + local_seq->seqcycle != remote_cycle) + result = COPYSEQ_MISMATCH; + + /* Sequence was concurrently renamed? */ + if (strcmp(seqinfo_local->nspname, + get_namespace_name(RelationGetNamespace(*sequence_rel))) || + strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel))) + result = COPYSEQ_MISMATCH; + + ReleaseSysCache(tup); + return result; +} + +/* + * Apply remote sequence state to local sequence and mark it as + * synchronized (READY). + */ +static CopySeqResult +copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner) +{ + UserContext ucxt; + AclResult aclresult; + bool run_as_owner = MySubscription->runasowner; + Oid seqoid = seqinfo->localrelid; + + /* + * If the user did not opt to run as the owner of the subscription + * ('run_as_owner'), then copy the sequence as the owner of the sequence. + */ + if (!run_as_owner) + SwitchToUntrustedUser(seqowner, &ucxt); + + aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE); + + if (aclresult != ACLCHECK_OK) + { + if (!run_as_owner) + RestoreUserContext(&ucxt); + + return COPYSEQ_INSUFFICIENT_PERM; + } + + SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called); + + if (!run_as_owner) + RestoreUserContext(&ucxt); + + /* + * Record the remote sequence's LSN in pg_subscription_rel and mark the + * sequence as READY. + */ + UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, + seqinfo->page_lsn, false); + + return COPYSEQ_SUCCESS; +} + +/* + * Copy existing data of sequences from the publisher. + */ +static void +copy_sequences(WalReceiverConn *conn) +{ + int cur_batch_base_index = 0; + int n_seqinfos = list_length(seqinfos); + List *mismatched_seqs_idx = NIL; + List *missing_seqs_idx = NIL; + List *insuffperm_seqs_idx = NIL; + StringInfo seqstr = makeStringInfo(); + StringInfo cmd = makeStringInfo(); + MemoryContext oldctx; + +#define MAX_SEQUENCES_SYNC_PER_BATCH 100 + + ereport(LOG, + errmsg("logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d", + MySubscription->name, n_seqinfos)); + + while (cur_batch_base_index < n_seqinfos) + { + Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, + BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID}; + int batch_size = 0; + int batch_succeeded_count = 0; + int batch_mismatched_count = 0; + int batch_skipped_count = 0; + int batch_insuffperm_count = 0; + int batch_missing_count; + Relation sequence_rel; + + WalRcvExecResult *res; + TupleTableSlot *slot; + + StartTransactionCommand(); + + for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); + + if (seqstr->len > 0) + appendStringInfoString(seqstr, ", "); + + appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)", + seqinfo->nspname, seqinfo->seqname, idx); + + if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH) + break; + } + + /* + * We deliberately avoid acquiring a local lock on the sequence before + * querying the publisher to prevent potential distributed deadlocks + * in bi-directional replication setups. + * + * Example scenario: + * + * - On each node, a background worker acquires a lock on a sequence + * as part of a sync operation. + * + * - Concurrently, a user transaction attempts to alter the same + * sequence, waiting on the background worker's lock. + * + * - Meanwhile, a query from the other node tries to access metadata + * that depends on the completion of the alter operation. + * + * - This creates a circular wait across nodes: + * + * Node-1: Query -> waits on Alter -> waits on Sync Worker + * + * Node-2: Query -> waits on Alter -> waits on Sync Worker + * + * Since each node only sees part of the wait graph, the deadlock may + * go undetected, leading to indefinite blocking. + * + * Note: Each entry in VALUES includes an index 'seqidx' that + * represents the sequence's position in the local 'seqinfos' list. + * This index is propagated to the query results and later used to + * directly map the fetched publisher sequence rows back to their + * corresponding local entries without relying on result order or name + * matching. + */ + appendStringInfo(cmd, + "SELECT s.seqidx, ps.*, seq.seqtypid,\n" + " seq.seqstart, seq.seqincrement, seq.seqmin,\n" + " seq.seqmax, seq.seqcycle\n" + "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n" + "JOIN pg_namespace n ON n.nspname = s.schname\n" + "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n" + "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n" + "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n", + seqstr->data); + + res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch sequence information from the publisher: %s", + res->err)); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + CopySeqResult sync_status; + LogicalRepSequenceInfo *seqinfo; + int seqidx; + + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + sync_status = get_and_validate_seq_info(slot, &sequence_rel, + &seqinfo, &seqidx); + if (sync_status == COPYSEQ_SUCCESS) + sync_status = copy_sequence(seqinfo, + sequence_rel->rd_rel->relowner); + + switch (sync_status) + { + case COPYSEQ_MISMATCH: + + /* + * Remember mismatched sequences in a long-lived memory + * context, since these will be used after the transaction + * commits. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + mismatched_seqs_idx = lappend_int(mismatched_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_mismatched_count++; + break; + case COPYSEQ_INSUFFICIENT_PERM: + + /* + * Remember the sequences with insufficient privileges in + * a long-lived memory context, since these will be used + * after the transaction commits. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx, + seqidx); + MemoryContextSwitchTo(oldctx); + batch_insuffperm_count++; + break; + case COPYSEQ_SKIPPED: + ereport(LOG, + errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently", + seqinfo->nspname, + seqinfo->seqname)); + batch_skipped_count++; + break; + case COPYSEQ_SUCCESS: + elog(DEBUG1, + "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished", + MySubscription->name, seqinfo->nspname, + seqinfo->seqname); + batch_succeeded_count++; + break; + } + + if (sequence_rel) + table_close(sequence_rel, NoLock); + } + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + resetStringInfo(seqstr); + resetStringInfo(cmd); + + batch_missing_count = batch_size - (batch_succeeded_count + + batch_mismatched_count + + batch_insuffperm_count + + batch_skipped_count); + + elog(DEBUG1, + "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d skipped, %d mismatched, %d insufficient permission, %d missing from publisher", + MySubscription->name, + (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, + batch_size, batch_succeeded_count, batch_skipped_count, + batch_mismatched_count, batch_insuffperm_count, + batch_missing_count); + + /* Commit this batch, and prepare for next batch */ + CommitTransactionCommand(); + + if (batch_missing_count) + { + for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++) + { + LogicalRepSequenceInfo *seqinfo = + (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); + + /* If the sequence was not found on publisher, record it */ + if (!seqinfo->found_on_pub) + missing_seqs_idx = lappend_int(missing_seqs_idx, idx); + } + } + + /* + * cur_batch_base_index is not incremented sequentially because some + * sequences may be missing, and the number of fetched rows may not + * match the batch size. + */ + cur_batch_base_index += batch_size; + } + + /* Report permission issues, mismatches, or missing sequences */ + if (insuffperm_seqs_idx || mismatched_seqs_idx || missing_seqs_idx) + report_sequence_errors(insuffperm_seqs_idx, mismatched_seqs_idx, + missing_seqs_idx); +} + +/* + * Identifies sequences that require synchronization and initiates the + * synchronization process. + */ +static void +LogicalRepSyncSequences(void) +{ + char *err; + bool must_use_password; + Relation rel; + HeapTuple tup; + ScanKeyData skey[2]; + SysScanDesc scan; + Oid subid = MyLogicalRepWorker->subid; + StringInfoData app_name; + + StartTransactionCommand(); + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[0], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + ScanKeyInit(&skey[1], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(SUBREL_STATE_INIT)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, 2, skey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + LogicalRepSequenceInfo *seq; + Relation sequence_rel; + MemoryContext oldctx; + + CHECK_FOR_INTERRUPTS(); + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock); + + /* Skip if sequence was dropped concurrently */ + if (!sequence_rel) + continue; + + /* Skip if the relation is not a sequence */ + if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE) + continue; + + /* + * Worker needs to process sequences across transaction boundary, so + * allocate them under long-lived context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + + seq = palloc0_object(LogicalRepSequenceInfo); + seq->localrelid = subrel->srrelid; + seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel)); + seq->seqname = pstrdup(RelationGetRelationName(sequence_rel)); + seqinfos = lappend(seqinfos, seq); + + MemoryContextSwitchTo(oldctx); + + table_close(sequence_rel, NoLock); + } + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + CommitTransactionCommand(); + + /* + * Exit early if no catalog entries found, likely due to concurrent drops. + */ + if (!seqinfos) + return; + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !MySubscription->ownersuperuser; + + initStringInfo(&app_name); + appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT, + MySubscription->oid, GetSystemIdentifier()); + + /* + * Establish the connection to the publisher for sequence synchronization. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, true, + must_use_password, + app_name.data, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s", + MySubscription->name, err)); + + pfree(app_name.data); + + copy_sequences(LogRepWorkerWalRcvConn); +} + +/* + * Execute the initial sync with error handling. Disable the subscription, + * if required. + * + * Note that we don't handle FATAL errors which are probably because of system + * resource error and are not repeatable. + */ +static void +start_sequence_sync() +{ + Assert(am_sequencesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + LogicalRepSyncSequences(); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during sequence synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_SEQUENCESYNC); + + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* Logical Replication sequencesync worker entry point */ +void +SequenceSyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + start_sequence_sync(); + + FinishSyncWorker(); +} diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index ae8c93859168..19f989410532 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -16,6 +16,7 @@ #include "catalog/pg_subscription_rel.h" #include "pgstat.h" +#include "replication/logicallauncher.h" #include "replication/worker_internal.h" #include "storage/ipc.h" #include "utils/lsyscache.h" @@ -48,6 +49,8 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE pg_noreturn void FinishSyncWorker(void) { + Assert(am_sequencesync_worker() || am_tablesync_worker()); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -61,16 +64,32 @@ FinishSyncWorker(void) /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); - StartTransactionCommand(); - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); - CommitTransactionCommand(); + if (am_sequencesync_worker()) + { + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", + MySubscription->name)); + + /* + * Find the leader apply worker and reset last_seqsync_start_time. + * This ensures that the apply worker can restart the sequence sync + * worker promptly whenever required. + */ + logicalrep_reset_seqsync_start_time(); + } + else + { + StartTransactionCommand(); + ereport(LOG, + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + CommitTransactionCommand(); - /* Find the leader apply worker and signal it. */ - logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, - InvalidOid); + /* Find the leader apply worker and signal it. */ + logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, + InvalidOid); + } /* Stop gracefully */ proc_exit(0); @@ -86,7 +105,52 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) } /* - * Process possible state change(s) of relations that are being synchronized. + * Attempt to launch a sync worker for one or more sequences or a table, if + * a worker slot is available and the retry interval has elapsed. + * + * wtype: sync worker type. + * nsyncworkers: Number of currently running sync workers for the subscription. + * relid: InvalidOid for sequencesync worker, actual relid for tablesync + * worker. + * last_start_time: Pointer to the last start time of the worker. + */ +void +launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, + TimestampTz *last_start_time) +{ + TimestampTz now; + + Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) || + (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid))); + + /* If there is a free sync worker slot, start a new sync worker */ + if (nsyncworkers >= max_sync_workers_per_subscription) + return; + + now = GetCurrentTimestamp(); + + if (!(*last_start_time) || + TimestampDifferenceExceeds(*last_start_time, now, + wal_retrieve_retry_interval)) + { + /* + * Set the last_start_time even if we fail to start the worker, so + * that we won't retry until wal_retrieve_retry_interval has elapsed. + */ + *last_start_time = now; + (void) logicalrep_worker_launch(wtype, + MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + relid, DSM_HANDLE_INVALID, false); + } +} + +/* + * Process possible state change(s) of relations that are being synchronized + * and start new tablesync workers for the newly added tables. Also, start a + * new sequencesync worker for the newly added sequences. */ void ProcessSyncingRelations(XLogRecPtr current_lsn) @@ -108,6 +172,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) case WORKERTYPE_APPLY: ProcessSyncingTablesForApply(current_lsn); + ProcessSequencesForSync(); + break; + + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "sequence synchronization worker is not expected to process relations"); break; case WORKERTYPE_UNKNOWN: @@ -117,17 +187,29 @@ ProcessSyncingRelations(XLogRecPtr current_lsn) } /* - * Common code to fetch the up-to-date sync state info into the static lists. + * Common code to fetch the up-to-date sync state info for tables and sequences. * - * Returns true if subscription has 1 or more tables, else false. + * The pg_subscription_rel catalog is shared by tables and sequences. Changes + * to either sequences or tables can affect the validity of relation states, so + * we identify non-READY tables and non-READY sequences together to ensure + * consistency. * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. + * has_pending_subtables: true if the subscription has one or more tables that + * are not in READY state, otherwise false. + * has_pending_subsequences: true if the subscription has one or more sequences + * that are not in READY state, otherwise false. */ -bool -FetchRelationStates(bool *started_tx) +void +FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_subsequences, + bool *started_tx) { + /* + * has_subtables and has_subsequences_non_ready are declared as static, + * since the same value can be used until the system table is invalidated. + */ static bool has_subtables = false; + static bool has_subsequences_non_ready = false; *started_tx = false; @@ -135,10 +217,10 @@ FetchRelationStates(bool *started_tx) { MemoryContext oldctx; List *rstates; - ListCell *lc; SubscriptionRelState *rstate; relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; + has_subsequences_non_ready = false; /* Clean the old lists. */ list_free_deep(table_states_not_ready); @@ -150,17 +232,23 @@ FetchRelationStates(bool *started_tx) *started_tx = true; } - /* Fetch tables that are in non-ready state. */ - rstates = GetSubscriptionRelations(MySubscription->oid, true, false, + /* Fetch tables and sequences that are in non-READY state. */ + rstates = GetSubscriptionRelations(MySubscription->oid, true, true, true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) + foreach_ptr(SubscriptionRelState, subrel, rstates) { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states_not_ready = lappend(table_states_not_ready, rstate); + if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE) + has_subsequences_non_ready = true; + else + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, subrel, sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, + rstate); + } } MemoryContextSwitchTo(oldctx); @@ -185,5 +273,9 @@ FetchRelationStates(bool *started_tx) relation_states_validity = SYNC_RELATIONS_STATE_VALID; } - return has_subtables; + if (has_pending_subtables) + *has_pending_subtables = has_subtables; + + if (has_pending_subsequences) + *has_pending_subsequences = has_subsequences_non_ready; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 58c98488d7b7..dcc6124cc730 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -374,14 +374,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) }; static HTAB *last_start_times = NULL; ListCell *lc; - bool started_tx = false; + bool started_tx; bool should_exit = false; Relation rel = NULL; Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ - FetchRelationStates(&started_tx); + FetchRelationStates(NULL, NULL, &started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid @@ -415,6 +415,14 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE); + if (rstate->state == SUBREL_STATE_SYNCDONE) { /* @@ -428,11 +436,6 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } /* * Remove the tablesync origin tracking if exists. @@ -552,43 +555,19 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) */ int nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + struct tablesync_start_time_mapping *hentry; + bool found; /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); - /* - * If there are free sync worker slot(s), start a new sync - * worker for the table. - */ - if (nsyncworkers < max_sync_workers_per_subscription) - { - TimestampTz now = GetCurrentTimestamp(); - struct tablesync_start_time_mapping *hentry; - bool found; + hentry = hash_search(last_start_times, &rstate->relid, + HASH_ENTER, &found); + if (!found) + hentry->last_start_time = 0; - hentry = hash_search(last_start_times, &rstate->relid, - HASH_ENTER, &found); - - if (!found || - TimestampDifferenceExceeds(hentry->last_start_time, now, - wal_retrieve_retry_interval)) - { - /* - * Set the last_start_time even if we fail to start - * the worker, so that we won't retry until - * wal_retrieve_retry_interval has elapsed. - */ - hentry->last_start_time = now; - (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC, - MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid, - DSM_HANDLE_INVALID, - false); - } - } + launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers, + rstate->relid, &hentry->last_start_time); } } } @@ -1432,8 +1411,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) } /* - * Make sure that the copy command runs as the table owner, unless the - * user has opted out of that behaviour. + * If the user did not opt to run as the owner of the subscription + * ('run_as_owner'), then copy the table as the owner of the table. */ run_as_owner = MySubscription->runasowner; if (!run_as_owner) @@ -1551,7 +1530,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_TABLESYNC); PG_RE_THROW(); } @@ -1596,7 +1576,7 @@ run_tablesync_worker() /* Logical Replication Tablesync worker entry point */ void -TablesyncWorkerMain(Datum main_arg) +TableSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); @@ -1618,11 +1598,11 @@ TablesyncWorkerMain(Datum main_arg) bool AllTablesyncsReady(void) { - bool started_tx = false; - bool has_subrels = false; + bool started_tx; + bool has_tables; /* We need up-to-date sync state info for subscription tables here. */ - has_subrels = FetchRelationStates(&started_tx); + FetchRelationStates(&has_tables, NULL, &started_tx); if (started_tx) { @@ -1634,7 +1614,7 @@ AllTablesyncsReady(void) * Return false when there are no tables in subscription or not all tables * are in ready state; true otherwise. */ - return has_subrels && (table_states_not_ready == NIL); + return has_tables && (table_states_not_ready == NIL); } /* @@ -1649,10 +1629,10 @@ bool HasSubscriptionTablesCached(void) { bool started_tx; - bool has_subrels; + bool has_tables; /* We need up-to-date subscription tables info here */ - has_subrels = FetchRelationStates(&started_tx); + FetchRelationStates(&has_tables, NULL, &started_tx); if (started_tx) { @@ -1660,7 +1640,7 @@ HasSubscriptionTablesCached(void) pgstat_report_stat(true); } - return has_subrels; + return has_tables; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7edd1c9cf060..8b89eddb0cc7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "sequence synchronization worker is not expected to apply changes"); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -1243,7 +1248,10 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1365,7 +1373,10 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -1421,7 +1432,10 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1487,7 +1501,10 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1622,7 +1639,10 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -2465,7 +2485,10 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -4137,7 +4160,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) AcceptInvalidationMessages(); maybe_reread_subscription(); - /* Process any table synchronization changes. */ + /* + * Process any relations that are being synchronized in parallel + * and any newly added tables or sequences. + */ ProcessSyncingRelations(last_received); } @@ -5580,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + pgstat_report_subscription_error(MySubscription->oid, + MyLogicalRepWorker->type); PG_RE_THROW(); } @@ -5700,8 +5727,8 @@ run_apply_worker() } /* - * Common initialization for leader apply worker, parallel apply worker and - * tablesync worker. + * Common initialization for leader apply worker, parallel apply worker, + * tablesync worker and sequencesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. @@ -5812,6 +5839,10 @@ InitializeLogRepWorker(void) (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else if (am_sequencesync_worker()) + ereport(LOG, + (errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", + MySubscription->name))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", @@ -5831,14 +5862,16 @@ replorigin_reset(int code, Datum arg) replorigin_session_origin_timestamp = 0; } -/* Common function to setup the leader apply or tablesync worker. */ +/* + * Common function to setup the leader apply, tablesync and sequencesync worker. + */ void SetupApplyOrSyncWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -5921,9 +5954,12 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - /* Report the worker failed during either table synchronization or apply */ + /* + * Report the worker failed during either sequence synchronization or + * table synchronization or apply. + */ pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); + MyLogicalRepWorker->type); /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index f9a1c831a07e..35916772b9dc 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "replication/worker_internal.h" #include "utils/pgstat_internal.h" @@ -24,7 +25,7 @@ * Report a subscription error. */ void -pgstat_report_subscription_error(Oid subid, bool is_apply_error) +pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) { PgStat_EntryRef *entry_ref; PgStat_BackendSubEntry *pending; @@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error) InvalidOid, subid, NULL); pending = entry_ref->pending; - if (is_apply_error) - pending->apply_error_count++; - else - pending->sync_error_count++; + switch (wtype) + { + case WORKERTYPE_APPLY: + pending->apply_error_count++; + break; + + case WORKERTYPE_SEQUENCESYNC: + pending->seq_sync_error_count++; + break; + + case WORKERTYPE_TABLESYNC: + pending->sync_error_count++; + break; + + default: + /* Should never happen. */ + Assert(0); + break; + } } /* @@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) #define SUB_ACC(fld) shsubent->stats.fld += localent->fld SUB_ACC(apply_error_count); + SUB_ACC(seq_sync_error_count); SUB_ACC(sync_error_count); for (int i = 0; i < CONFLICT_NUM_TYPES; i++) SUB_ACC(conflict_count[i]); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index a710508979e4..1521d6e2ab43 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2203,7 +2203,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2221,25 +2221,27 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "seq_sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2256,6 +2258,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* apply_error_count */ values[i++] = Int64GetDatum(subentry->apply_error_count); + /* seq_sync_error_count */ + values[i++] = Int64GetDatum(subentry->seq_sync_error_count); + /* sync_error_count */ values[i++] = Int64GetDatum(subentry->sync_error_count); diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 25da769eb359..1128167c0251 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2058,7 +2058,7 @@ }, { name => 'max_sync_workers_per_subscription', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_SUBSCRIBERS', - short_desc => 'Maximum number of table synchronization workers per subscription.', + short_desc => 'Maximum number of workers per subscription for synchronizing tables and sequences.', variable => 'max_sync_workers_per_subscription', boot_val => '2', min => '0', diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9121a382f76b..5cf9e12fcb9a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -3433,7 +3433,7 @@ proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u', prorettype => 'int8', proargtypes => 'regclass', prosrc => 'pg_sequence_last_value' }, -{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump', +{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump and sequence synchronization', proname => 'pg_get_sequence_data', provolatile => 'v', proparallel => 'u', prorettype => 'record', proargtypes => 'regclass', proallargtypes => '{regclass,int8,bool,pg_lsn}', proargmodes => '{i,o,o,o}', @@ -5704,9 +5704,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,seq_sync_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 9f88498ecd3f..9508e553b52a 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -82,6 +82,29 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; +/* + * Holds local sequence identity and corresponding publisher values used during + * sequence synchronization. + */ +typedef struct LogicalRepSequenceInfo +{ + /* Sequence information retrieved from the local node */ + char *seqname; + char *nspname; + Oid localrelid; + + /* Sequence information retrieved from the publisher node */ + XLogRecPtr page_lsn; + int64 last_value; + bool is_called; + + /* + * True if the sequence identified by (nspname, seqname) exists on the + * publisher. + */ + bool found_on_pub; +} LogicalRepSequenceInfo; + extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 9ac0b67683d3..46b4d89dd6ea 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -60,6 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid relid, int64 next, bool is_called); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7ae503e71a27..a0610bb3e316 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -16,6 +16,7 @@ #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ #include "replication/conflict.h" +#include "replication/worker_internal.h" #include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/pgstat_kind.h" @@ -108,6 +109,7 @@ typedef struct PgStat_FunctionCallUsage typedef struct PgStat_BackendSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter seq_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; } PgStat_BackendSubEntry; @@ -416,6 +418,7 @@ typedef struct PgStat_SLRUStats typedef struct PgStat_StatSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter seq_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; TimestampTz stat_reset_timestamp; @@ -769,7 +772,8 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void); * Functions in pgstat_subscription.c */ -extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error); +extern void pgstat_report_subscription_error(Oid subid, + LogicalRepWorkerType wtype); extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type); extern void pgstat_create_subscription(Oid subid); extern void pgstat_drop_subscription(Oid subid); diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 88912606e4d5..56fa79b648e7 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); -extern void TablesyncWorkerMain(Datum main_arg); +extern void TableSyncWorkerMain(Datum main_arg); +extern void SequenceSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index e23fa9a4514e..f081619f1513 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -106,6 +107,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz last_seqsync_start_time; } LogicalRepWorker; /* @@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); +extern void logicalrep_reset_seqsync_start_time(void); extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, @@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); +extern void ProcessSequencesForSync(void); pg_noreturn extern void FinishSyncWorker(void); extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue); +extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, + Oid relid, TimestampTz *last_start_time); extern void ProcessSyncingRelations(XLogRecPtr current_lsn); -extern bool FetchRelationStates(bool *started_tx); +extern void FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_sequences, bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); @@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 77e25ca029e5..fe20f613c3af 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2191,6 +2191,7 @@ pg_stat_subscription| SELECT su.oid AS subid, pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, + ss.seq_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, @@ -2202,7 +2203,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_multiple_unique_conflicts, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, seq_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index 00a1c2fcd48c..23f3511f9a4f 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -21,7 +21,8 @@ sub create_sub_pub_w_errors { - my ($node_publisher, $node_subscriber, $db, $table_name) = @_; + my ($node_publisher, $node_subscriber, $db, $table_name, $sequence_name) + = @_; # Initial table setup on both publisher and subscriber. On subscriber we # create the same tables but with primary keys. Also, insert some data that # will conflict with the data replicated from publisher later. @@ -32,6 +33,7 @@ sub create_sub_pub_w_errors CREATE TABLE $table_name(a int); ALTER TABLE $table_name REPLICA IDENTITY FULL; INSERT INTO $table_name VALUES (1); + CREATE SEQUENCE $sequence_name; COMMIT; ]); $node_subscriber->safe_psql( @@ -40,45 +42,56 @@ sub create_sub_pub_w_errors BEGIN; CREATE TABLE $table_name(a int primary key); INSERT INTO $table_name VALUES (1); + CREATE SEQUENCE $sequence_name INCREMENT BY 10; COMMIT; ]); # Set up publication. my $pub_name = $table_name . '_pub'; + my $pub_seq_name = $sequence_name . '_pub'; my $publisher_connstr = $node_publisher->connstr . qq( dbname=$db); - $node_publisher->safe_psql($db, - qq(CREATE PUBLICATION $pub_name FOR TABLE $table_name)); + $node_publisher->safe_psql( + $db, + qq[ + CREATE PUBLICATION $pub_name FOR TABLE $table_name; + CREATE PUBLICATION $pub_seq_name FOR ALL SEQUENCES; + ]); # Create subscription. The tablesync for table on subscription will enter into - # infinite error loop due to violating the unique constraint. + # infinite error loop due to violating the unique constraint. The sequencesync + # will also fail due to different sequence increment values on publisher and + # subscriber. my $sub_name = $table_name . '_sub'; $node_subscriber->safe_psql($db, - qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name) + qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name, $pub_seq_name) ); $node_publisher->wait_for_catchup($sub_name); - # Wait for the tablesync error to be reported. + # Wait for the tablesync and sequencesync error to be reported. $node_subscriber->poll_query_until( $db, qq[ - SELECT sync_error_count > 0 - FROM pg_stat_subscription_stats - WHERE subname = '$sub_name' + SELECT count(1) = 1 FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' and seq_sync_error_count > 0 and sync_error_count > 0 ]) or die qq(Timed out while waiting for tablesync errors for subscription '$sub_name'); + # Change the sequence start value on the subscriber so that it doesn't error out. + $node_subscriber->safe_psql($db, + qq(ALTER SEQUENCE $sequence_name INCREMENT 1)); + # Truncate test_tab1 so that tablesync worker can continue. $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name)); - # Wait for initial tablesync to finish. + # Wait for initial sync to finish. $node_subscriber->poll_query_until( $db, qq[ - SELECT count(1) = 1 FROM pg_subscription_rel - WHERE srrelid = '$table_name'::regclass AND srsubstate in ('r', 's') + SELECT count(1) = 2 FROM pg_subscription_rel + WHERE srrelid IN ('$table_name'::regclass, '$sequence_name'::regclass) AND srsubstate in ('r', 's') ]) or die qq(Timed out while waiting for subscriber to synchronize data for table '$table_name'.); @@ -136,14 +149,17 @@ sub create_sub_pub_w_errors # Create the publication and subscription with sync and apply errors my $table1_name = 'test_tab1'; +my $sequence1_name = 'test_seq1'; my ($pub1_name, $sub1_name) = create_sub_pub_w_errors($node_publisher, $node_subscriber, $db, - $table1_name); + $table1_name, $sequence1_name); -# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset +# timestamp is NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, + seq_sync_error_count > 0, sync_error_count > 0, confl_insert_exists > 0, confl_delete_missing > 0, @@ -151,8 +167,8 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.) + qq(t|t|t|t|t|t), + qq(Check that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.) ); # Reset a single subscription @@ -160,10 +176,12 @@ sub create_sub_pub_w_errors qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name'))) ); -# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and +# stats_reset timestamp is not NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -171,8 +189,8 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) ); # Get reset timestamp @@ -198,14 +216,17 @@ sub create_sub_pub_w_errors # Make second subscription and publication my $table2_name = 'test_tab2'; +my $sequence2_name = 'test_seq2'; my ($pub2_name, $sub2_name) = create_sub_pub_w_errors($node_publisher, $node_subscriber, $db, - $table2_name); + $table2_name, $sequence2_name); -# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 +# and stats_reset timestamp is NULL is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, + seq_sync_error_count > 0, sync_error_count > 0, confl_insert_exists > 0, confl_delete_missing > 0, @@ -213,18 +234,20 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.) ); # Reset all subscriptions $node_subscriber->safe_psql($db, qq(SELECT pg_stat_reset_subscription_stats(NULL))); -# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and +# stats_reset timestamp is not NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -232,13 +255,14 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) ); is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -246,8 +270,8 @@ sub create_sub_pub_w_errors FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) ); $reset_time1 = $node_subscriber->safe_psql($db, diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl index 557fc91c017b..ef40ca979b1e 100644 --- a/src/test/subscription/t/036_sequences.pl +++ b/src/test/subscription/t/036_sequences.pl @@ -1,7 +1,7 @@ # Copyright (c) 2025, PostgreSQL Global Development Group -# This tests that sequences are registered to be synced to the subscriber +# This tests that sequences are synced correctly to the subscriber use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -14,6 +14,7 @@ # Avoid checkpoint during the test, otherwise, extra values will be fetched for # the sequences which will cause the test to fail randomly. $node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'checkpoint_timeout = 1h'); $node_publisher->start; # Initialize subscriber node @@ -28,7 +29,14 @@ ); $node_publisher->safe_psql('postgres', $ddl); -# Setup the same structure on the subscriber +# Setup the same structure on the subscriber, plus some extra sequences that +# we'll create on the publisher later +$ddl = qq( + CREATE TABLE regress_seq_test (v BIGINT); + CREATE SEQUENCE regress_s1; + CREATE SEQUENCE regress_s2; + CREATE SEQUENCE regress_s3; +); $node_subscriber->safe_psql('postgres', $ddl); # Insert initial test data @@ -46,10 +54,165 @@ "CREATE SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_connstr' PUBLICATION regress_seq_pub" ); -# Confirm sequences can be listed in pg_subscription_rel -my $result = $node_subscriber->safe_psql('postgres', - "SELECT relname, srsubstate FROM pg_class, pg_subscription_rel WHERE oid = srrelid" +# Wait for initial sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|0|t', 'initial test data replicated'); + +########## +## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new +# sequences of the publisher, but changes to existing sequences should +# not be synced. +########## + +# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1' +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s2; + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100); +)); + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION; +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|31|t', 'Check sequence value in the publisher'); + +# Check - existing sequence ('regress_s1') is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '100|0|t', 'REFRESH PUBLICATION will not sync existing sequence'); + +# Check - newly published sequence ('regress_s2') is synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '100|0|t', + 'REFRESH PUBLICATION will sync newly published sequence'); + +########## +# Test: REFRESH SEQUENCES and REFRESH PUBLICATION (copy_data = off) +# +# 1. ALTER SUBSCRIPTION ... REFRESH SEQUENCES should re-synchronize all +# existing sequences, but not synchronize newly added ones. +# 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = off) should +# also not update sequence values for newly added sequences. +########## + +# Create a new sequence 'regress_s3', and update the existing sequence +# 'regress_s2'. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s3; + INSERT INTO regress_seq_test SELECT nextval('regress_s3') FROM generate_series(1,100); + + -- Existing sequence + INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100); +)); + +# 1. Do ALTER SUBSCRIPTION ... REFRESH SEQUENCES +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES; +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - existing sequences ('regress_s1' and 'regress_s2') are synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s1; +)); +is($result, '200|0|t', 'REFRESH SEQUENCES will sync existing sequences'); +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s2; +)); +is($result, '200|0|t', 'REFRESH SEQUENCES will sync existing sequences'); + +# Check - newly published sequence ('regress_s3') is not synced +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s3; +)); +is($result, '1|0|f', + 'REFRESH SEQUENCES will not sync newly published sequence'); + +# 2. Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data as false +$result = $node_subscriber->safe_psql( + 'postgres', qq( + ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION WITH (copy_data = false); +)); +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Check - newly published sequence ('regress_s3') is not synced when +# (copy_data = off). +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT last_value, log_cnt, is_called FROM regress_s3; +)); +is($result, '1|0|f', + 'REFRESH PUBLICATION will not sync newly published sequence with copy_data as off' ); -is($result, 'regress_s1|i', "Sequence can be in pg_subscription_rel catalog"); + +########## +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should report an error when: +# a) sequence definitions differ between the publisher and subscriber, or +# b) a sequence is missing on the publisher. +########## + +# Create a new sequence 'regress_s4' whose START value is not the same in the +# publisher and subscriber. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s4 START 1 INCREMENT 2; +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s4 START 10 INCREMENT 2; +)); + +my $log_offset = -s $node_subscriber->logfile; + +# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION"); + +# Verify that an error is logged for parameter differences on sequence +# ('regress_s4'). +$node_subscriber->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? mismatched or renamed sequence on subscriber \("public.regress_s4"\)\n.*ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"/, + $log_offset); + +# Verify that an error is logged for the missing sequence ('regress_s4'). +$node_publisher->safe_psql('postgres', qq(DROP SEQUENCE regress_s4;)); + +$node_subscriber->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)\n.*ERROR: ( [A-Z0-9]+:)? logical replication sequence synchronization failed for subscription "regress_seq_sub"/, + $log_offset); done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 018b5919cf66..2ca7b75af579 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -526,6 +526,7 @@ CopyMethod CopyMultiInsertBuffer CopyMultiInsertInfo CopyOnErrorChoice +CopySeqResult CopySource CopyStmt CopyToRoutine @@ -1629,6 +1630,7 @@ LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation LogicalRepRollbackPreparedTxnData +LogicalRepSequenceInfo LogicalRepStreamAbortData LogicalRepTupleData LogicalRepTyp