Skip to content

Commit 8fc2efc

Browse files
author
Commitfest Bot
committed
[CF 5781] v3 - Make wal_receiver_timeout configurable per subscription
This branch was automatically generated by a robot using patches from an email thread registered at: https://commitfest.postgresql.org/patch/5781 The branch will be overwritten each time a new patch version is posted to the thread, and also periodically to check for bitrot caused by changes on the master branch. Patch(es): https://www.postgresql.org/message-id/CAHGQGwG82P4s6tmYK=aEm-T7QfGJBZvXo=WZfckMkffsX6DZjQ@mail.gmail.com Author(s): Fujii Masao
2 parents c9e38a5 + adcaece commit 8fc2efc

File tree

14 files changed

+244
-101
lines changed

14 files changed

+244
-101
lines changed

doc/src/sgml/catalogs.sgml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8162,6 +8162,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
81628162
</para></entry>
81638163
</row>
81648164

8165+
<row>
8166+
<entry role="catalog_table_entry"><para role="column_definition">
8167+
<structfield>subwalrcvtimeout</structfield> <type>text</type>
8168+
</para>
8169+
<para>
8170+
The <varname>wal_receiver_timeout</varname>
8171+
setting for the subscription's workers to use
8172+
</para></entry>
8173+
</row>
8174+
81658175
<row>
81668176
<entry role="catalog_table_entry"><para role="column_definition">
81678177
<structfield>subpublications</structfield> <type>text[]</type>

doc/src/sgml/config.sgml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5154,9 +5154,6 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
51545154
If this value is specified without units, it is taken as milliseconds.
51555155
The default value is 60 seconds.
51565156
A value of zero disables the timeout mechanism.
5157-
This parameter can only be set in
5158-
the <filename>postgresql.conf</filename> file or on the server
5159-
command line.
51605157
</para>
51615158
</listitem>
51625159
</varlistentry>

doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
265265
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
266266
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
267267
<link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>,
268-
<link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>, and
269-
<link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>.
268+
<link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>,
269+
<link linkend="sql-createsubscription-params-with-max-retention-duration"><literal>max_retention_duration</literal></link>, and
270+
<link linkend="sql-createsubscription-params-with-wal-receiver-timeout"><literal>wal_receiver_timeout</literal></link>.
270271
Only a superuser can set <literal>password_required = false</literal>.
271272
</para>
272273

doc/src/sgml/ref/create_subscription.sgml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,8 +563,21 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
563563
</warning>
564564
</listitem>
565565
</varlistentry>
566-
</variablelist></para>
567566

567+
<varlistentry id="sql-createsubscription-params-with-wal-receiver-timeout">
568+
<term><literal>wal_receiver_timeout</literal> (<type>text</type>)</term>
569+
<listitem>
570+
<para>
571+
The value of this parameter overrides the
572+
<xref linkend="guc-wal-receiver-timeout"/> setting within this
573+
subscription's apply worker processes. The default value is
574+
<literal>-1</literal>, which means it does not override the global setting,
575+
i.e., the value from the server configuration, command line, role or
576+
database settings will be used instead.
577+
</para>
578+
</listitem>
579+
</varlistentry>
580+
</variablelist></para>
568581
</listitem>
569582
</varlistentry>
570583
</variablelist>

src/backend/catalog/pg_subscription.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ GetSubscription(Oid subid, bool missing_ok)
129129
Anum_pg_subscription_subsynccommit);
130130
sub->synccommit = TextDatumGetCString(datum);
131131

132+
/* Get walrcvtimeout */
133+
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
134+
tup,
135+
Anum_pg_subscription_subwalrcvtimeout);
136+
sub->walrcvtimeout = TextDatumGetCString(datum);
137+
132138
/* Get publications */
133139
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
134140
tup,

src/backend/commands/subscriptioncmds.c

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@
7373
#define SUBOPT_FAILOVER 0x00002000
7474
#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
7575
#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
76-
#define SUBOPT_LSN 0x00010000
77-
#define SUBOPT_ORIGIN 0x00020000
76+
#define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000
77+
#define SUBOPT_LSN 0x00020000
78+
#define SUBOPT_ORIGIN 0x00040000
7879

7980
/* check if the 'val' has 'bits' set */
8081
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -104,6 +105,7 @@ typedef struct SubOpts
104105
int32 maxretention;
105106
char *origin;
106107
XLogRecPtr lsn;
108+
char *wal_receiver_timeout;
107109
} SubOpts;
108110

109111
/*
@@ -402,6 +404,30 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
402404
opts->specified_opts |= SUBOPT_LSN;
403405
opts->lsn = lsn;
404406
}
407+
else if (IsSet(supported_opts, SUBOPT_WAL_RECEIVER_TIMEOUT) &&
408+
strcmp(defel->defname, "wal_receiver_timeout") == 0)
409+
{
410+
bool parsed;
411+
int val;
412+
413+
if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
414+
errorConflictingDefElem(defel, pstate);
415+
416+
opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
417+
opts->wal_receiver_timeout = defGetString(defel);
418+
419+
/*
420+
* Test if the given value is valid for wal_receiver_timeeout GUC.
421+
* Skip this test if the value is -1, since -1 is allowed for the
422+
* wal_receiver_timeout subscription option, but not for the GUC
423+
* itself.
424+
*/
425+
parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
426+
if (!parsed || val != -1)
427+
(void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
428+
PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
429+
false, 0, false);
430+
}
405431
else
406432
ereport(ERROR,
407433
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -610,7 +636,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
610636
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
611637
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
612638
SUBOPT_RETAIN_DEAD_TUPLES |
613-
SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
639+
SUBOPT_MAX_RETENTION_DURATION |
640+
SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
614641
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
615642

616643
/*
@@ -693,6 +720,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
693720
if (opts.synchronous_commit == NULL)
694721
opts.synchronous_commit = "off";
695722

723+
/*
724+
* The default for wal_receiver_timeout of subscriptions is -1, which
725+
* means the value is inherited from the server configuration, command
726+
* line, or role/database settings.
727+
*/
728+
if (opts.wal_receiver_timeout == NULL)
729+
opts.wal_receiver_timeout = "-1";
730+
696731
conninfo = stmt->conninfo;
697732
publications = stmt->publication;
698733

@@ -740,6 +775,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
740775
nulls[Anum_pg_subscription_subslotname - 1] = true;
741776
values[Anum_pg_subscription_subsynccommit - 1] =
742777
CStringGetTextDatum(opts.synchronous_commit);
778+
values[Anum_pg_subscription_subwalrcvtimeout - 1] =
779+
CStringGetTextDatum(opts.wal_receiver_timeout);
743780
values[Anum_pg_subscription_subpublications - 1] =
744781
publicationListToArray(publications);
745782
values[Anum_pg_subscription_suborigin - 1] =
@@ -1408,6 +1445,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
14081445
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
14091446
SUBOPT_RETAIN_DEAD_TUPLES |
14101447
SUBOPT_MAX_RETENTION_DURATION |
1448+
SUBOPT_WAL_RECEIVER_TIMEOUT |
14111449
SUBOPT_ORIGIN);
14121450

14131451
parse_subscription_options(pstate, stmt->options,
@@ -1663,6 +1701,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
16631701
origin = opts.origin;
16641702
}
16651703

1704+
if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1705+
{
1706+
values[Anum_pg_subscription_subwalrcvtimeout - 1] =
1707+
CStringGetTextDatum(opts.wal_receiver_timeout);
1708+
replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
1709+
}
1710+
16661711
update_tuple = true;
16671712
break;
16681713
}

src/backend/replication/logical/worker.c

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,8 @@ static inline void reset_apply_error_context_info(void);
627627
static TransApplyAction get_transaction_apply_action(TransactionId xid,
628628
ParallelApplyWorkerInfo **winfo);
629629

630+
static void set_wal_receiver_timeout(void);
631+
630632
static void replorigin_reset(int code, Datum arg);
631633

632634
/*
@@ -5128,12 +5130,46 @@ maybe_reread_subscription(void)
51285130
SetConfigOption("synchronous_commit", MySubscription->synccommit,
51295131
PGC_BACKEND, PGC_S_OVERRIDE);
51305132

5133+
/* Change wal_receiver_timeout according to the user's wishes */
5134+
set_wal_receiver_timeout();
5135+
51315136
if (started_tx)
51325137
CommitTransactionCommand();
51335138

51345139
MySubscriptionValid = true;
51355140
}
51365141

5142+
/*
5143+
* Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5144+
*/
5145+
static void
5146+
set_wal_receiver_timeout(void)
5147+
{
5148+
bool parsed;
5149+
int val;
5150+
5151+
/*
5152+
* Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5153+
* which comes from the subscription's wal_receiver_timeout option. If the
5154+
* value is -1, reset the GUC to its default, meaning it will inherit from
5155+
* the server config, command line, or role/database settings.
5156+
*/
5157+
parsed = parse_int(MySubscription->walrcvtimeout, &val, 0, NULL);
5158+
if (parsed && val == -1)
5159+
SetConfigOption("wal_receiver_timeout", NULL,
5160+
PGC_BACKEND, PGC_S_SESSION);
5161+
else
5162+
SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5163+
PGC_BACKEND, PGC_S_SESSION);
5164+
5165+
/*
5166+
* Log the current wal_receiver_timeout GUC value (in milliseconds) as a
5167+
* debug message to verify it was set correctly.
5168+
*/
5169+
elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5170+
MySubscription->name, wal_receiver_timeout);
5171+
}
5172+
51375173
/*
51385174
* Callback from subscription syscache invalidation.
51395175
*/
@@ -5795,6 +5831,9 @@ InitializeLogRepWorker(void)
57955831
SetConfigOption("synchronous_commit", MySubscription->synccommit,
57965832
PGC_BACKEND, PGC_S_OVERRIDE);
57975833

5834+
/* Change wal_receiver_timeout according to the user's wishes */
5835+
set_wal_receiver_timeout();
5836+
57985837
/*
57995838
* Keep us informed about subscription or role changes. Note that the
58005839
* role's superuser privilege can be revoked.

src/backend/utils/misc/guc_parameters.dat

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1059,7 +1059,7 @@
10591059
max => 'INT_MAX / 1000',
10601060
},
10611061

1062-
{ name => 'wal_receiver_timeout', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
1062+
{ name => 'wal_receiver_timeout', type => 'int', context => 'PGC_USERSET', group => 'REPLICATION_STANDBY',
10631063
short_desc => 'Sets the maximum wait time to receive data from the sending server.',
10641064
long_desc => '0 disables the timeout.',
10651065
flags => 'GUC_UNIT_MS',

src/bin/pg_dump/pg_dump.c

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5109,6 +5109,7 @@ getSubscriptions(Archive *fout)
51095109
int i_subconninfo;
51105110
int i_subslotname;
51115111
int i_subsynccommit;
5112+
int i_subwalrcvtimeout;
51125113
int i_subpublications;
51135114
int i_suborigin;
51145115
int i_suboriginremotelsn;
@@ -5202,10 +5203,17 @@ getSubscriptions(Archive *fout)
52025203

52035204
if (fout->remoteVersion >= 190000)
52045205
appendPQExpBufferStr(query,
5205-
" s.submaxretention\n");
5206+
" s.submaxretention,\n");
52065207
else
52075208
appendPQExpBuffer(query,
5208-
" 0 AS submaxretention\n");
5209+
" 0 AS submaxretention,\n");
5210+
5211+
if (fout->remoteVersion >= 190000)
5212+
appendPQExpBufferStr(query,
5213+
" s.subwalrcvtimeout\n");
5214+
else
5215+
appendPQExpBufferStr(query,
5216+
" '-1' AS subwalrcvtimeout\n");
52095217

52105218
appendPQExpBufferStr(query,
52115219
"FROM pg_subscription s\n");
@@ -5244,6 +5252,7 @@ getSubscriptions(Archive *fout)
52445252
i_subconninfo = PQfnumber(res, "subconninfo");
52455253
i_subslotname = PQfnumber(res, "subslotname");
52465254
i_subsynccommit = PQfnumber(res, "subsynccommit");
5255+
i_subwalrcvtimeout = PQfnumber(res, "subwalrcvtimeout");
52475256
i_subpublications = PQfnumber(res, "subpublications");
52485257
i_suborigin = PQfnumber(res, "suborigin");
52495258
i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
@@ -5287,6 +5296,8 @@ getSubscriptions(Archive *fout)
52875296
pg_strdup(PQgetvalue(res, i, i_subslotname));
52885297
subinfo[i].subsynccommit =
52895298
pg_strdup(PQgetvalue(res, i, i_subsynccommit));
5299+
subinfo[i].subwalrcvtimeout =
5300+
pg_strdup(PQgetvalue(res, i, i_subwalrcvtimeout));
52905301
subinfo[i].subpublications =
52915302
pg_strdup(PQgetvalue(res, i, i_subpublications));
52925303
subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
@@ -5545,6 +5556,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
55455556
if (strcmp(subinfo->subsynccommit, "off") != 0)
55465557
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
55475558

5559+
if (strcmp(subinfo->subwalrcvtimeout, "-1") != 0)
5560+
appendPQExpBuffer(query, ", wal_receiver_timeout = %s", fmtId(subinfo->subwalrcvtimeout));
5561+
55485562
if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
55495563
appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
55505564

src/bin/pg_dump/pg_dump.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,7 @@ typedef struct _SubscriptionInfo
722722
char *subconninfo;
723723
char *subslotname;
724724
char *subsynccommit;
725+
char *subwalrcvtimeout;
725726
char *subpublications;
726727
char *suborigin;
727728
char *suboriginremotelsn;

0 commit comments

Comments
 (0)