diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 0a5ae5050c46..ce63b3e599c2 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -65,6 +65,7 @@ #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/pg_rusage.h" +#include "utils/timeout.h" /* Unsupported old recovery command file names (relative to $PGDATA) */ #define RECOVERY_COMMAND_FILE "recovery.conf" @@ -148,6 +149,13 @@ bool InArchiveRecovery = false; static bool StandbyModeRequested = false; bool StandbyMode = false; +/* + * Whether we are currently in process of processing recovery records while + * allowing downstream replication instances + */ +#define StandbyWithCascadeReplication() \ + (AmStartupProcess() && StandbyMode && AllowCascadeReplication()) + /* was a signal file present at startup? */ static bool standby_signal_file_found = false; static bool recovery_signal_file_found = false; @@ -304,6 +312,28 @@ bool reachedConsistency = false; static char *replay_image_masked = NULL; static char *primary_image_masked = NULL; +/* + * Maximum number of applied records in batch before notifying walsender during + * cascade replication + */ +int cascadeReplicationMaxBatchSize; + +/* + * Maximum batching delay before notifying walsender during cascade replication + */ +int cascadeReplicationMaxBatchDelay; + +/* Current cascade replication batching delay used while enabling timer */ +static int cascadeDelayCurrent = 0; + +/* Counter for applied records which are not yet signaled to walsenders */ +static int appliedRecords = 0; + +/* + * True if downstream walsenders need to be notified about pending WAL records, + * set by timeout handler. + */ +volatile sig_atomic_t replicationNotificationPending = false; /* * Shared-memory state for WAL recovery. @@ -1846,6 +1876,15 @@ PerformWalRecovery(void) * end of main redo apply loop */ + /* Send notification for batched messages once loop is ended */ + if (StandbyWithCascadeReplication() && appliedRecords > 0) + { + if (cascadeDelayCurrent > 0) + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + appliedRecords = 0; + WalSndWakeup(false, true); + } + if (reachedRecoveryTarget) { if (!reachedConsistency) @@ -2044,8 +2083,45 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl * be created otherwise) * ------ */ - if (AllowCascadeReplication()) - WalSndWakeup(switchedTLI, true); + + if (StandbyWithCascadeReplication()) + { + if (cascadeReplicationMaxBatchSize <= 1 && appliedRecords == 0) + WalSndWakeup(switchedTLI, true); + else + { + /* + * If time line has switched, then we will imediately notify both + * physical and logical downstream walsenders here, as we do not + * want to introduce additional delay in such case. Otherwise we + * will wait until we apply specified number of records before + * notifying downstream logical walsenders. + */ + bool batchFlushRequired = + ++appliedRecords >= cascadeReplicationMaxBatchSize || + replicationNotificationPending || + switchedTLI; + + if (batchFlushRequired) + { + if (cascadeDelayCurrent > 0) + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + appliedRecords = 0; + replicationNotificationPending = false; + } + + WalSndWakeup(switchedTLI, batchFlushRequired); + + /* Setup timeout to limit maximum delay for notifications */ + if (appliedRecords == 1) + { + cascadeDelayCurrent = cascadeReplicationMaxBatchDelay; + if (cascadeDelayCurrent > 0) + enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT, + cascadeDelayCurrent); + } + } + } /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the @@ -5114,3 +5190,50 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * GUC assign_hook for cascade_replication_batch_size and + * cascade_replication_batch_delay + */ +void +assign_cascade_replication_batch_values(int new_value, void *extra) +{ + /* + * If either cascade_replication_batch_size or + * cascade_replication_batch_delay is changed, then we want to disable + * current timer (if any) and immediately flush current batch. New values + * will be picked once next WAL record is applied. + */ + if (cascadeDelayCurrent > 0) + { + cascadeDelayCurrent = 0; + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + } + /* Will be processed by ProcessStartupProcInterrupts */ + replicationNotificationPending = true; +} + +/* + * Send notifications to downstream walsenders if there are batched records + */ +void +StandbyWalCheckSendNotify(void) +{ + if (appliedRecords > 0) + { + WalSndWakeup(false, true); + appliedRecords = 0; + } + replicationNotificationPending = false; +} + +/* + * Timer handler for batch notifications in cascade replication + */ +void +StandbyWalSendTimeoutHandler(void) +{ + replicationNotificationPending = true; + /* Most likely process is waiting for arrival of WAL records */ + WakeupRecovery(); +} diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index 27e86cf393f6..555090c82a09 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void) if (ProcSignalBarrierPending) ProcessProcSignalBarrier(); + /* Send a notification to downstream walsenders if required */ + if (replicationNotificationPending) + StandbyWalCheckSendNotify(); + /* Perform logging of memory contexts of this process */ if (LogMemoryContextPending) ProcessLogMemoryContextInterrupt(); @@ -246,6 +250,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len) RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler); RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler); RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler); + RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler); /* * Unblock signals (they were blocked when the postmaster forked us) diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 25da769eb359..e8aac0c6c829 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1865,6 +1865,27 @@ max => 'MAX_BACKENDS', }, +{ name => 'cascade_replication_batch_size', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Sets the maximum number of applied WAL records before cascade walsenders are notified on standby.', + long_desc => '0 disables records batching in cascade replication.', + variable => 'cascadeReplicationMaxBatchSize', + boot_val => '0', + min => '0', + max => 'INT_MAX', + assign_hook => 'assign_cascade_replication_batch_values', +}, + +{ name => 'cascade_replication_batch_delay', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY', + short_desc => 'Sets the maximum time before cascade walsenders are notified on standby about applied records.', + long_desc => '0 disables timed notifications. This option works only if cascade_replication_batch_size is greater than 0.', + flags => 'GUC_UNIT_MS', + variable => 'cascadeReplicationMaxBatchDelay', + boot_val => '500', + min => '0', + max => 'INT_MAX', + assign_hook => 'assign_cascade_replication_batch_values', +}, + { name => 'max_connections', type => 'int', context => 'PGC_POSTMASTER', group => 'CONN_AUTH_SETTINGS', short_desc => 'Sets the maximum number of concurrent connections.', variable => 'MaxConnections', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index f62b61967ef6..313a4fa93420 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -376,6 +376,12 @@ # is not set #wal_receiver_status_interval = 10s # send replies at least this often # 0 disables +#cascade_replication_batch_size = 0 # maximum number of applied WAL records + # before cascade walsenders are notified on standby + # 0 disables records batching in cascade replication +#cascade_replication_batch_delay = 500ms # maximum time before cascade walsenders + # are notified about applied records if batching is enabled + # 0 disables timed notifications during batching #hot_standby_feedback = off # send info from standby to prevent # query conflicts #wal_receiver_timeout = 60s # time that receiver waits for diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 8e475e266d18..75a09d4b9e33 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -11,6 +11,8 @@ #ifndef XLOGRECOVERY_H #define XLOGRECOVERY_H +#include + #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "lib/stringinfo.h" @@ -67,6 +69,8 @@ extern PGDLLIMPORT char *PrimarySlotName; extern PGDLLIMPORT char *recoveryRestoreCommand; extern PGDLLIMPORT char *recoveryEndCommand; extern PGDLLIMPORT char *archiveCleanupCommand; +extern PGDLLIMPORT int cascadeReplicationMaxBatchSize; +extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay; /* indirectly set via GUC system */ extern PGDLLIMPORT TransactionId recoveryTargetXid; @@ -165,4 +169,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending; +extern void StandbyWalCheckSendNotify(void); +extern void StandbyWalSendTimeoutHandler(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 82ac8646a8d4..370ad79d5e54 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -118,6 +118,7 @@ extern void assign_recovery_target_timeline(const char *newval, void *extra); extern bool check_recovery_target_xid(char **newval, void **extra, GucSource source); extern void assign_recovery_target_xid(const char *newval, void *extra); +extern void assign_cascade_replication_batch_values(int new_value, void *extra); extern bool check_role(char **newval, void **extra, GucSource source); extern void assign_role(const char *newval, void *extra); extern const char *show_role(void); diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h index 7b19beafdc95..0062cb562b94 100644 --- a/src/include/utils/timeout.h +++ b/src/include/utils/timeout.h @@ -36,6 +36,7 @@ typedef enum TimeoutId IDLE_STATS_UPDATE_TIMEOUT, CLIENT_CONNECTION_CHECK_TIMEOUT, STARTUP_PROGRESS_TIMEOUT, + STANDBY_CASCADE_WAL_SEND_TIMEOUT, /* First user-definable timeout reason */ USER_TIMEOUT, /* Maximum number of timeout reasons */