From 0ed5157de325709ee4b76ad826982ea75ec300fb Mon Sep 17 00:00:00 2001 From: Rustam Khamidullin Date: Fri, 14 Mar 2025 18:18:34 +0700 Subject: [PATCH] Implement batching for WAL records notification during cascade replication Currently standby server notifies walsenders after applying of each WAL record during cascade replication. This creates a bottleneck in case of large number of sender processes during WalSndWakeup invocation. This change introduces batching for such notifications, which are now sent either after certain number of applied records or specified time interval (whichever comes first). Co-authored-by: Alexey Makhmutov --- src/backend/access/transam/xlogrecovery.c | 127 +++++++++++++++++- src/backend/postmaster/startup.c | 5 + src/backend/utils/misc/guc_parameters.dat | 21 +++ src/backend/utils/misc/postgresql.conf.sample | 6 + src/include/access/xlogrecovery.h | 8 ++ src/include/utils/guc_hooks.h | 1 + src/include/utils/timeout.h | 1 + 7 files changed, 167 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 3e3c4da01a24..a71a6de0b520 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -65,6 +65,7 @@ #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/pg_rusage.h" +#include "utils/timeout.h" /* Unsupported old recovery command file names (relative to $PGDATA) */ #define RECOVERY_COMMAND_FILE "recovery.conf" @@ -148,6 +149,13 @@ bool InArchiveRecovery = false; static bool StandbyModeRequested = false; bool StandbyMode = false; +/* + * Whether we are currently in process of processing recovery records while + * allowing downstream replication instances + */ +#define StandbyWithCascadeReplication() \ + (AmStartupProcess() && StandbyMode && AllowCascadeReplication()) + /* was a signal file present at startup? */ static bool standby_signal_file_found = false; static bool recovery_signal_file_found = false; @@ -304,6 +312,28 @@ bool reachedConsistency = false; static char *replay_image_masked = NULL; static char *primary_image_masked = NULL; +/* + * Maximum number of applied records in batch before notifying walsender during + * cascade replication + */ +int cascadeReplicationMaxBatchSize; + +/* + * Maximum batching delay before notifying walsender during cascade replication + */ +int cascadeReplicationMaxBatchDelay; + +/* Current cascade replication batching delay used while enabling timer */ +static int cascadeDelayCurrent = 0; + +/* Counter for applied records which are not yet signaled to walsenders */ +static int appliedRecords = 0; + +/* + * True if downstream walsenders need to be notified about pending WAL records, + * set by timeout handler. + */ +volatile sig_atomic_t replicationNotificationPending = false; /* * Shared-memory state for WAL recovery. @@ -1846,6 +1876,15 @@ PerformWalRecovery(void) * end of main redo apply loop */ + /* Send notification for batched messages once loop is ended */ + if (StandbyWithCascadeReplication() && appliedRecords > 0) + { + if (cascadeDelayCurrent > 0) + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + appliedRecords = 0; + WalSndWakeup(false, true); + } + if (reachedRecoveryTarget) { if (!reachedConsistency) @@ -2044,8 +2083,45 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl * be created otherwise) * ------ */ - if (AllowCascadeReplication()) - WalSndWakeup(switchedTLI, true); + + if (StandbyWithCascadeReplication()) + { + if (cascadeReplicationMaxBatchSize <= 1 && appliedRecords == 0) + WalSndWakeup(switchedTLI, true); + else + { + /* + * If time line has switched, then we will imediately notify both + * physical and logical downstream walsenders here, as we do not + * want to introduce additional delay in such case. Otherwise we + * will wait until we apply specified number of records before + * notifying downstream logical walsenders. + */ + bool batchFlushRequired = + ++appliedRecords >= cascadeReplicationMaxBatchSize || + replicationNotificationPending || + switchedTLI; + + if (batchFlushRequired) + { + if (cascadeDelayCurrent > 0) + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + appliedRecords = 0; + replicationNotificationPending = false; + } + + WalSndWakeup(switchedTLI, batchFlushRequired); + + /* Setup timeout to limit maximum delay for notifications */ + if (appliedRecords == 1) + { + cascadeDelayCurrent = cascadeReplicationMaxBatchDelay; + if (cascadeDelayCurrent > 0) + enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT, + cascadeDelayCurrent); + } + } + } /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the @@ -5095,3 +5171,50 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * GUC assign_hook for cascade_replication_batch_size and + * cascade_replication_batch_delay + */ +void +assign_cascade_replication_batch_values(int new_value, void *extra) +{ + /* + * If either cascade_replication_batch_size or + * cascade_replication_batch_delay is changed, then we want to disable + * current timer (if any) and immediately flush current batch. New values + * will be picked once next WAL record is applied. + */ + if (cascadeDelayCurrent > 0) + { + cascadeDelayCurrent = 0; + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + } + /* Will be processed by ProcessStartupProcInterrupts */ + replicationNotificationPending = true; +} + +/* + * Send notifications to downstream walsenders if there are batched records + */ +void +StandbyWalCheckSendNotify(void) +{ + if (appliedRecords > 0) + { + WalSndWakeup(false, true); + appliedRecords = 0; + } + replicationNotificationPending = false; +} + +/* + * Timer handler for batch notifications in cascade replication + */ +void +StandbyWalSendTimeoutHandler(void) +{ + replicationNotificationPending = true; + /* Most likely process is waiting for arrival of WAL records */ + WakeupRecovery(); +} diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index 27e86cf393f6..555090c82a09 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void) if (ProcSignalBarrierPending) ProcessProcSignalBarrier(); + /* Send a notification to downstream walsenders if required */ + if (replicationNotificationPending) + StandbyWalCheckSendNotify(); + /* Perform logging of memory contexts of this process */ if (LogMemoryContextPending) ProcessLogMemoryContextInterrupt(); @@ -246,6 +250,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len) RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler); RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler); RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler); + RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler); /* * Unblock signals (they were blocked when the postmaster forked us) diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index d6fc83338505..0c47172c8d1d 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1069,6 +1069,27 @@ max => 'INT_MAX', }, +{ name => 'cascade_replication_batch_size', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Sets the maximum number of applied WAL records before cascade walsenders are notified on standby.', + long_desc => '0 disables records batching in cascade replication.', + variable => 'cascadeReplicationMaxBatchSize', + boot_val => '0', + min => '0', + max => 'INT_MAX', + assign_hook => 'assign_cascade_replication_batch_values', +}, + +{ name => 'cascade_replication_batch_delay', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Sets the maximum time before cascade walsenders are notified on standby about applied records.', + long_desc => '0 disables timed notifications. This option works only if cascade_replication_batch_size is greater than 0.', + flags => 'GUC_UNIT_MS', + variable => 'cascadeReplicationMaxBatchDelay', + boot_val => '500', + min => '0', + max => 'INT_MAX', + assign_hook => 'assign_cascade_replication_batch_values', +}, + { name => 'max_connections', type => 'int', context => 'PGC_POSTMASTER', group => 'CONN_AUTH_SETTINGS', short_desc => 'Sets the maximum number of concurrent connections.', variable => 'MaxConnections', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index f62b61967ef6..313a4fa93420 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -376,6 +376,12 @@ # is not set #wal_receiver_status_interval = 10s # send replies at least this often # 0 disables +#cascade_replication_batch_size = 0 # maximum number of applied WAL records + # before cascade walsenders are notified on standby + # 0 disables records batching in cascade replication +#cascade_replication_batch_delay = 500ms # maximum time before cascade walsenders + # are notified about applied records if batching is enabled + # 0 disables timed notifications during batching #hot_standby_feedback = off # send info from standby to prevent # query conflicts #wal_receiver_timeout = 60s # time that receiver waits for diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 8e475e266d18..75a09d4b9e33 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -11,6 +11,8 @@ #ifndef XLOGRECOVERY_H #define XLOGRECOVERY_H +#include + #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "lib/stringinfo.h" @@ -67,6 +69,8 @@ extern PGDLLIMPORT char *PrimarySlotName; extern PGDLLIMPORT char *recoveryRestoreCommand; extern PGDLLIMPORT char *recoveryEndCommand; extern PGDLLIMPORT char *archiveCleanupCommand; +extern PGDLLIMPORT int cascadeReplicationMaxBatchSize; +extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay; /* indirectly set via GUC system */ extern PGDLLIMPORT TransactionId recoveryTargetXid; @@ -165,4 +169,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending; +extern void StandbyWalCheckSendNotify(void); +extern void StandbyWalSendTimeoutHandler(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 82ac8646a8d4..370ad79d5e54 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -118,6 +118,7 @@ extern void assign_recovery_target_timeline(const char *newval, void *extra); extern bool check_recovery_target_xid(char **newval, void **extra, GucSource source); extern void assign_recovery_target_xid(const char *newval, void *extra); +extern void assign_cascade_replication_batch_values(int new_value, void *extra); extern bool check_role(char **newval, void **extra, GucSource source); extern void assign_role(const char *newval, void *extra); extern const char *show_role(void); diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h index 7b19beafdc95..0062cb562b94 100644 --- a/src/include/utils/timeout.h +++ b/src/include/utils/timeout.h @@ -36,6 +36,7 @@ typedef enum TimeoutId IDLE_STATS_UPDATE_TIMEOUT, CLIENT_CONNECTION_CHECK_TIMEOUT, STARTUP_PROGRESS_TIMEOUT, + STANDBY_CASCADE_WAL_SEND_TIMEOUT, /* First user-definable timeout reason */ USER_TIMEOUT, /* Maximum number of timeout reasons */