Skip to content

Commit 0ed5157

Browse files
Rustam KhamidullinAlexey Makhmutov
authored andcommitted
Implement batching for WAL records notification during cascade replication
Currently standby server notifies walsenders after applying of each WAL record during cascade replication. This creates a bottleneck in case of large number of sender processes during WalSndWakeup invocation. This change introduces batching for such notifications, which are now sent either after certain number of applied records or specified time interval (whichever comes first). Co-authored-by: Alexey Makhmutov <[email protected]>
1 parent b70cafd commit 0ed5157

File tree

7 files changed

+167
-2
lines changed

7 files changed

+167
-2
lines changed

src/backend/access/transam/xlogrecovery.c

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
#include "utils/pg_lsn.h"
6666
#include "utils/ps_status.h"
6767
#include "utils/pg_rusage.h"
68+
#include "utils/timeout.h"
6869

6970
/* Unsupported old recovery command file names (relative to $PGDATA) */
7071
#define RECOVERY_COMMAND_FILE "recovery.conf"
@@ -148,6 +149,13 @@ bool InArchiveRecovery = false;
148149
static bool StandbyModeRequested = false;
149150
bool StandbyMode = false;
150151

152+
/*
153+
* Whether we are currently in process of processing recovery records while
154+
* allowing downstream replication instances
155+
*/
156+
#define StandbyWithCascadeReplication() \
157+
(AmStartupProcess() && StandbyMode && AllowCascadeReplication())
158+
151159
/* was a signal file present at startup? */
152160
static bool standby_signal_file_found = false;
153161
static bool recovery_signal_file_found = false;
@@ -304,6 +312,28 @@ bool reachedConsistency = false;
304312
static char *replay_image_masked = NULL;
305313
static char *primary_image_masked = NULL;
306314

315+
/*
316+
* Maximum number of applied records in batch before notifying walsender during
317+
* cascade replication
318+
*/
319+
int cascadeReplicationMaxBatchSize;
320+
321+
/*
322+
* Maximum batching delay before notifying walsender during cascade replication
323+
*/
324+
int cascadeReplicationMaxBatchDelay;
325+
326+
/* Current cascade replication batching delay used while enabling timer */
327+
static int cascadeDelayCurrent = 0;
328+
329+
/* Counter for applied records which are not yet signaled to walsenders */
330+
static int appliedRecords = 0;
331+
332+
/*
333+
* True if downstream walsenders need to be notified about pending WAL records,
334+
* set by timeout handler.
335+
*/
336+
volatile sig_atomic_t replicationNotificationPending = false;
307337

308338
/*
309339
* Shared-memory state for WAL recovery.
@@ -1846,6 +1876,15 @@ PerformWalRecovery(void)
18461876
* end of main redo apply loop
18471877
*/
18481878

1879+
/* Send notification for batched messages once loop is ended */
1880+
if (StandbyWithCascadeReplication() && appliedRecords > 0)
1881+
{
1882+
if (cascadeDelayCurrent > 0)
1883+
disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
1884+
appliedRecords = 0;
1885+
WalSndWakeup(false, true);
1886+
}
1887+
18491888
if (reachedRecoveryTarget)
18501889
{
18511890
if (!reachedConsistency)
@@ -2044,8 +2083,45 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
20442083
* be created otherwise)
20452084
* ------
20462085
*/
2047-
if (AllowCascadeReplication())
2048-
WalSndWakeup(switchedTLI, true);
2086+
2087+
if (StandbyWithCascadeReplication())
2088+
{
2089+
if (cascadeReplicationMaxBatchSize <= 1 && appliedRecords == 0)
2090+
WalSndWakeup(switchedTLI, true);
2091+
else
2092+
{
2093+
/*
2094+
* If time line has switched, then we will imediately notify both
2095+
* physical and logical downstream walsenders here, as we do not
2096+
* want to introduce additional delay in such case. Otherwise we
2097+
* will wait until we apply specified number of records before
2098+
* notifying downstream logical walsenders.
2099+
*/
2100+
bool batchFlushRequired =
2101+
++appliedRecords >= cascadeReplicationMaxBatchSize ||
2102+
replicationNotificationPending ||
2103+
switchedTLI;
2104+
2105+
if (batchFlushRequired)
2106+
{
2107+
if (cascadeDelayCurrent > 0)
2108+
disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
2109+
appliedRecords = 0;
2110+
replicationNotificationPending = false;
2111+
}
2112+
2113+
WalSndWakeup(switchedTLI, batchFlushRequired);
2114+
2115+
/* Setup timeout to limit maximum delay for notifications */
2116+
if (appliedRecords == 1)
2117+
{
2118+
cascadeDelayCurrent = cascadeReplicationMaxBatchDelay;
2119+
if (cascadeDelayCurrent > 0)
2120+
enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT,
2121+
cascadeDelayCurrent);
2122+
}
2123+
}
2124+
}
20492125

20502126
/*
20512127
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
@@ -5095,3 +5171,50 @@ assign_recovery_target_xid(const char *newval, void *extra)
50955171
else
50965172
recoveryTarget = RECOVERY_TARGET_UNSET;
50975173
}
5174+
5175+
/*
5176+
* GUC assign_hook for cascade_replication_batch_size and
5177+
* cascade_replication_batch_delay
5178+
*/
5179+
void
5180+
assign_cascade_replication_batch_values(int new_value, void *extra)
5181+
{
5182+
/*
5183+
* If either cascade_replication_batch_size or
5184+
* cascade_replication_batch_delay is changed, then we want to disable
5185+
* current timer (if any) and immediately flush current batch. New values
5186+
* will be picked once next WAL record is applied.
5187+
*/
5188+
if (cascadeDelayCurrent > 0)
5189+
{
5190+
cascadeDelayCurrent = 0;
5191+
disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
5192+
}
5193+
/* Will be processed by ProcessStartupProcInterrupts */
5194+
replicationNotificationPending = true;
5195+
}
5196+
5197+
/*
5198+
* Send notifications to downstream walsenders if there are batched records
5199+
*/
5200+
void
5201+
StandbyWalCheckSendNotify(void)
5202+
{
5203+
if (appliedRecords > 0)
5204+
{
5205+
WalSndWakeup(false, true);
5206+
appliedRecords = 0;
5207+
}
5208+
replicationNotificationPending = false;
5209+
}
5210+
5211+
/*
5212+
* Timer handler for batch notifications in cascade replication
5213+
*/
5214+
void
5215+
StandbyWalSendTimeoutHandler(void)
5216+
{
5217+
replicationNotificationPending = true;
5218+
/* Most likely process is waiting for arrival of WAL records */
5219+
WakeupRecovery();
5220+
}

src/backend/postmaster/startup.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void)
189189
if (ProcSignalBarrierPending)
190190
ProcessProcSignalBarrier();
191191

192+
/* Send a notification to downstream walsenders if required */
193+
if (replicationNotificationPending)
194+
StandbyWalCheckSendNotify();
195+
192196
/* Perform logging of memory contexts of this process */
193197
if (LogMemoryContextPending)
194198
ProcessLogMemoryContextInterrupt();
@@ -246,6 +250,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len)
246250
RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler);
247251
RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler);
248252
RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler);
253+
RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler);
249254

250255
/*
251256
* Unblock signals (they were blocked when the postmaster forked us)

src/backend/utils/misc/guc_parameters.dat

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,6 +1069,27 @@
10691069
max => 'INT_MAX',
10701070
},
10711071

1072+
{ name => 'cascade_replication_batch_size', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
1073+
short_desc => 'Sets the maximum number of applied WAL records before cascade walsenders are notified on standby.',
1074+
long_desc => '0 disables records batching in cascade replication.',
1075+
variable => 'cascadeReplicationMaxBatchSize',
1076+
boot_val => '0',
1077+
min => '0',
1078+
max => 'INT_MAX',
1079+
assign_hook => 'assign_cascade_replication_batch_values',
1080+
},
1081+
1082+
{ name => 'cascade_replication_batch_delay', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
1083+
short_desc => 'Sets the maximum time before cascade walsenders are notified on standby about applied records.',
1084+
long_desc => '0 disables timed notifications. This option works only if cascade_replication_batch_size is greater than 0.',
1085+
flags => 'GUC_UNIT_MS',
1086+
variable => 'cascadeReplicationMaxBatchDelay',
1087+
boot_val => '500',
1088+
min => '0',
1089+
max => 'INT_MAX',
1090+
assign_hook => 'assign_cascade_replication_batch_values',
1091+
},
1092+
10721093
{ name => 'max_connections', type => 'int', context => 'PGC_POSTMASTER', group => 'CONN_AUTH_SETTINGS',
10731094
short_desc => 'Sets the maximum number of concurrent connections.',
10741095
variable => 'MaxConnections',

src/backend/utils/misc/postgresql.conf.sample

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,12 @@
376376
# is not set
377377
#wal_receiver_status_interval = 10s # send replies at least this often
378378
# 0 disables
379+
#cascade_replication_batch_size = 0 # maximum number of applied WAL records
380+
# before cascade walsenders are notified on standby
381+
# 0 disables records batching in cascade replication
382+
#cascade_replication_batch_delay = 500ms # maximum time before cascade walsenders
383+
# are notified about applied records if batching is enabled
384+
# 0 disables timed notifications during batching
379385
#hot_standby_feedback = off # send info from standby to prevent
380386
# query conflicts
381387
#wal_receiver_timeout = 60s # time that receiver waits for

src/include/access/xlogrecovery.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#ifndef XLOGRECOVERY_H
1212
#define XLOGRECOVERY_H
1313

14+
#include <signal.h>
15+
1416
#include "access/xlogreader.h"
1517
#include "catalog/pg_control.h"
1618
#include "lib/stringinfo.h"
@@ -67,6 +69,8 @@ extern PGDLLIMPORT char *PrimarySlotName;
6769
extern PGDLLIMPORT char *recoveryRestoreCommand;
6870
extern PGDLLIMPORT char *recoveryEndCommand;
6971
extern PGDLLIMPORT char *archiveCleanupCommand;
72+
extern PGDLLIMPORT int cascadeReplicationMaxBatchSize;
73+
extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay;
7074

7175
/* indirectly set via GUC system */
7276
extern PGDLLIMPORT TransactionId recoveryTargetXid;
@@ -165,4 +169,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue,
165169

166170
extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
167171

172+
extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending;
173+
extern void StandbyWalCheckSendNotify(void);
174+
extern void StandbyWalSendTimeoutHandler(void);
175+
168176
#endif /* XLOGRECOVERY_H */

src/include/utils/guc_hooks.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ extern void assign_recovery_target_timeline(const char *newval, void *extra);
118118
extern bool check_recovery_target_xid(char **newval, void **extra,
119119
GucSource source);
120120
extern void assign_recovery_target_xid(const char *newval, void *extra);
121+
extern void assign_cascade_replication_batch_values(int new_value, void *extra);
121122
extern bool check_role(char **newval, void **extra, GucSource source);
122123
extern void assign_role(const char *newval, void *extra);
123124
extern const char *show_role(void);

src/include/utils/timeout.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ typedef enum TimeoutId
3636
IDLE_STATS_UPDATE_TIMEOUT,
3737
CLIENT_CONNECTION_CHECK_TIMEOUT,
3838
STARTUP_PROGRESS_TIMEOUT,
39+
STANDBY_CASCADE_WAL_SEND_TIMEOUT,
3940
/* First user-definable timeout reason */
4041
USER_TIMEOUT,
4142
/* Maximum number of timeout reasons */

0 commit comments

Comments
 (0)