Skip to content

Commit 5965c0a

Browse files
author
Commitfest Bot
committed
[CF 6019] v2 - Implement batching for walsender notifications during logical cascade replication
This branch was automatically generated by a robot using patches from an email thread registered at: https://commitfest.postgresql.org/patch/6019 The branch will be overwritten each time a new patch version is posted to the thread, and also periodically to check for bitrot caused by changes on the master branch. Patch(es): https://www.postgresql.org/message-id/[email protected] Author(s): Alexey Makhmutov
2 parents b70cafd + 0ed5157 commit 5965c0a

File tree

7 files changed

+167
-2
lines changed

7 files changed

+167
-2
lines changed

src/backend/access/transam/xlogrecovery.c

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
#include "utils/pg_lsn.h"
6666
#include "utils/ps_status.h"
6767
#include "utils/pg_rusage.h"
68+
#include "utils/timeout.h"
6869

6970
/* Unsupported old recovery command file names (relative to $PGDATA) */
7071
#define RECOVERY_COMMAND_FILE "recovery.conf"
@@ -148,6 +149,13 @@ bool InArchiveRecovery = false;
148149
static bool StandbyModeRequested = false;
149150
bool StandbyMode = false;
150151

152+
/*
153+
* Whether we are currently in process of processing recovery records while
154+
* allowing downstream replication instances
155+
*/
156+
#define StandbyWithCascadeReplication() \
157+
(AmStartupProcess() && StandbyMode && AllowCascadeReplication())
158+
151159
/* was a signal file present at startup? */
152160
static bool standby_signal_file_found = false;
153161
static bool recovery_signal_file_found = false;
@@ -304,6 +312,28 @@ bool reachedConsistency = false;
304312
static char *replay_image_masked = NULL;
305313
static char *primary_image_masked = NULL;
306314

315+
/*
316+
* Maximum number of applied records in batch before notifying walsender during
317+
* cascade replication
318+
*/
319+
int cascadeReplicationMaxBatchSize;
320+
321+
/*
322+
* Maximum batching delay before notifying walsender during cascade replication
323+
*/
324+
int cascadeReplicationMaxBatchDelay;
325+
326+
/* Current cascade replication batching delay used while enabling timer */
327+
static int cascadeDelayCurrent = 0;
328+
329+
/* Counter for applied records which are not yet signaled to walsenders */
330+
static int appliedRecords = 0;
331+
332+
/*
333+
* True if downstream walsenders need to be notified about pending WAL records,
334+
* set by timeout handler.
335+
*/
336+
volatile sig_atomic_t replicationNotificationPending = false;
307337

308338
/*
309339
* Shared-memory state for WAL recovery.
@@ -1846,6 +1876,15 @@ PerformWalRecovery(void)
18461876
* end of main redo apply loop
18471877
*/
18481878

1879+
/* Send notification for batched messages once loop is ended */
1880+
if (StandbyWithCascadeReplication() && appliedRecords > 0)
1881+
{
1882+
if (cascadeDelayCurrent > 0)
1883+
disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
1884+
appliedRecords = 0;
1885+
WalSndWakeup(false, true);
1886+
}
1887+
18491888
if (reachedRecoveryTarget)
18501889
{
18511890
if (!reachedConsistency)
@@ -2044,8 +2083,45 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
20442083
* be created otherwise)
20452084
* ------
20462085
*/
2047-
if (AllowCascadeReplication())
2048-
WalSndWakeup(switchedTLI, true);
2086+
2087+
if (StandbyWithCascadeReplication())
2088+
{
2089+
if (cascadeReplicationMaxBatchSize <= 1 && appliedRecords == 0)
2090+
WalSndWakeup(switchedTLI, true);
2091+
else
2092+
{
2093+
/*
2094+
* If time line has switched, then we will imediately notify both
2095+
* physical and logical downstream walsenders here, as we do not
2096+
* want to introduce additional delay in such case. Otherwise we
2097+
* will wait until we apply specified number of records before
2098+
* notifying downstream logical walsenders.
2099+
*/
2100+
bool batchFlushRequired =
2101+
++appliedRecords >= cascadeReplicationMaxBatchSize ||
2102+
replicationNotificationPending ||
2103+
switchedTLI;
2104+
2105+
if (batchFlushRequired)
2106+
{
2107+
if (cascadeDelayCurrent > 0)
2108+
disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
2109+
appliedRecords = 0;
2110+
replicationNotificationPending = false;
2111+
}
2112+
2113+
WalSndWakeup(switchedTLI, batchFlushRequired);
2114+
2115+
/* Setup timeout to limit maximum delay for notifications */
2116+
if (appliedRecords == 1)
2117+
{
2118+
cascadeDelayCurrent = cascadeReplicationMaxBatchDelay;
2119+
if (cascadeDelayCurrent > 0)
2120+
enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT,
2121+
cascadeDelayCurrent);
2122+
}
2123+
}
2124+
}
20492125

20502126
/*
20512127
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
@@ -5095,3 +5171,50 @@ assign_recovery_target_xid(const char *newval, void *extra)
50955171
else
50965172
recoveryTarget = RECOVERY_TARGET_UNSET;
50975173
}
5174+
5175+
/*
5176+
* GUC assign_hook for cascade_replication_batch_size and
5177+
* cascade_replication_batch_delay
5178+
*/
5179+
void
5180+
assign_cascade_replication_batch_values(int new_value, void *extra)
5181+
{
5182+
/*
5183+
* If either cascade_replication_batch_size or
5184+
* cascade_replication_batch_delay is changed, then we want to disable
5185+
* current timer (if any) and immediately flush current batch. New values
5186+
* will be picked once next WAL record is applied.
5187+
*/
5188+
if (cascadeDelayCurrent > 0)
5189+
{
5190+
cascadeDelayCurrent = 0;
5191+
disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
5192+
}
5193+
/* Will be processed by ProcessStartupProcInterrupts */
5194+
replicationNotificationPending = true;
5195+
}
5196+
5197+
/*
5198+
* Send notifications to downstream walsenders if there are batched records
5199+
*/
5200+
void
5201+
StandbyWalCheckSendNotify(void)
5202+
{
5203+
if (appliedRecords > 0)
5204+
{
5205+
WalSndWakeup(false, true);
5206+
appliedRecords = 0;
5207+
}
5208+
replicationNotificationPending = false;
5209+
}
5210+
5211+
/*
5212+
* Timer handler for batch notifications in cascade replication
5213+
*/
5214+
void
5215+
StandbyWalSendTimeoutHandler(void)
5216+
{
5217+
replicationNotificationPending = true;
5218+
/* Most likely process is waiting for arrival of WAL records */
5219+
WakeupRecovery();
5220+
}

src/backend/postmaster/startup.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void)
189189
if (ProcSignalBarrierPending)
190190
ProcessProcSignalBarrier();
191191

192+
/* Send a notification to downstream walsenders if required */
193+
if (replicationNotificationPending)
194+
StandbyWalCheckSendNotify();
195+
192196
/* Perform logging of memory contexts of this process */
193197
if (LogMemoryContextPending)
194198
ProcessLogMemoryContextInterrupt();
@@ -246,6 +250,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len)
246250
RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler);
247251
RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler);
248252
RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler);
253+
RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler);
249254

250255
/*
251256
* Unblock signals (they were blocked when the postmaster forked us)

src/backend/utils/misc/guc_parameters.dat

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,6 +1069,27 @@
10691069
max => 'INT_MAX',
10701070
},
10711071

1072+
{ name => 'cascade_replication_batch_size', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
1073+
short_desc => 'Sets the maximum number of applied WAL records before cascade walsenders are notified on standby.',
1074+
long_desc => '0 disables records batching in cascade replication.',
1075+
variable => 'cascadeReplicationMaxBatchSize',
1076+
boot_val => '0',
1077+
min => '0',
1078+
max => 'INT_MAX',
1079+
assign_hook => 'assign_cascade_replication_batch_values',
1080+
},
1081+
1082+
{ name => 'cascade_replication_batch_delay', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
1083+
short_desc => 'Sets the maximum time before cascade walsenders are notified on standby about applied records.',
1084+
long_desc => '0 disables timed notifications. This option works only if cascade_replication_batch_size is greater than 0.',
1085+
flags => 'GUC_UNIT_MS',
1086+
variable => 'cascadeReplicationMaxBatchDelay',
1087+
boot_val => '500',
1088+
min => '0',
1089+
max => 'INT_MAX',
1090+
assign_hook => 'assign_cascade_replication_batch_values',
1091+
},
1092+
10721093
{ name => 'max_connections', type => 'int', context => 'PGC_POSTMASTER', group => 'CONN_AUTH_SETTINGS',
10731094
short_desc => 'Sets the maximum number of concurrent connections.',
10741095
variable => 'MaxConnections',

src/backend/utils/misc/postgresql.conf.sample

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,12 @@
376376
# is not set
377377
#wal_receiver_status_interval = 10s # send replies at least this often
378378
# 0 disables
379+
#cascade_replication_batch_size = 0 # maximum number of applied WAL records
380+
# before cascade walsenders are notified on standby
381+
# 0 disables records batching in cascade replication
382+
#cascade_replication_batch_delay = 500ms # maximum time before cascade walsenders
383+
# are notified about applied records if batching is enabled
384+
# 0 disables timed notifications during batching
379385
#hot_standby_feedback = off # send info from standby to prevent
380386
# query conflicts
381387
#wal_receiver_timeout = 60s # time that receiver waits for

src/include/access/xlogrecovery.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#ifndef XLOGRECOVERY_H
1212
#define XLOGRECOVERY_H
1313

14+
#include <signal.h>
15+
1416
#include "access/xlogreader.h"
1517
#include "catalog/pg_control.h"
1618
#include "lib/stringinfo.h"
@@ -67,6 +69,8 @@ extern PGDLLIMPORT char *PrimarySlotName;
6769
extern PGDLLIMPORT char *recoveryRestoreCommand;
6870
extern PGDLLIMPORT char *recoveryEndCommand;
6971
extern PGDLLIMPORT char *archiveCleanupCommand;
72+
extern PGDLLIMPORT int cascadeReplicationMaxBatchSize;
73+
extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay;
7074

7175
/* indirectly set via GUC system */
7276
extern PGDLLIMPORT TransactionId recoveryTargetXid;
@@ -165,4 +169,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue,
165169

166170
extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
167171

172+
extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending;
173+
extern void StandbyWalCheckSendNotify(void);
174+
extern void StandbyWalSendTimeoutHandler(void);
175+
168176
#endif /* XLOGRECOVERY_H */

src/include/utils/guc_hooks.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ extern void assign_recovery_target_timeline(const char *newval, void *extra);
118118
extern bool check_recovery_target_xid(char **newval, void **extra,
119119
GucSource source);
120120
extern void assign_recovery_target_xid(const char *newval, void *extra);
121+
extern void assign_cascade_replication_batch_values(int new_value, void *extra);
121122
extern bool check_role(char **newval, void **extra, GucSource source);
122123
extern void assign_role(const char *newval, void *extra);
123124
extern const char *show_role(void);

src/include/utils/timeout.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ typedef enum TimeoutId
3636
IDLE_STATS_UPDATE_TIMEOUT,
3737
CLIENT_CONNECTION_CHECK_TIMEOUT,
3838
STARTUP_PROGRESS_TIMEOUT,
39+
STANDBY_CASCADE_WAL_SEND_TIMEOUT,
3940
/* First user-definable timeout reason */
4041
USER_TIMEOUT,
4142
/* Maximum number of timeout reasons */

0 commit comments

Comments
 (0)