From ff02b32f35f0ebc1ff3202ce0489ec5bb7c7a6a5 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Fri, 27 Jun 2025 09:16:23 +0530 Subject: [PATCH] Report output plugin statistics in pg_stat_replication_slots As of now pg_stat_replication_slots reports statistics about the reorder buffer, but it does not report output plugin statistics like the amount of data filtered by the output plugin, amount of data sent downstream or the number of transactions sent downstream. This statistics is useful when investigating issues related to a slow downstream. This commit adds following fields to pg_stat_replication_slots - plugin_filtered_bytes is the amount of changes filtered out by the output plugin - plugin_sent_txns is the amount of transactions sent downstream by the output plugin - plugin_sent_bytes is the amount of data sent downstream by the output plugin. The prefix "plugin_" indicates that these counters are related to and maintained by the output plugin. An output plugin may choose not to initialize LogicalDecodingContext::stats, which holds these counters, in which case the above columns will be reported as NULL. When the stats are disabled after being enabled for a while, the plugin stats are reset to 0, rather than carrying over the stale stats from the time when the plugin was supporting the stats. This does not matter if the plugin continues not to support statistics forever. But in case it was supporting the stats once, discontinued doing so at some point in time and then starts supporting the stats later, accumulating the new stats based on the earlier accumulated stats could be misleading. Filtered bytes are reported next to total_bytes to keep these two closely related fields together. Additionally report name of the output plugin in the view for an easy reference. total_bytes and total_txns are the only fields remaining unqualified - they do not convey what those bytes and txns are. Hence rename them total_wal_bytes and total_wal_txns respectively to indicate that those counts come from WAL stream. Author: Ashutosh Bapat Reviewed-by: Shveta Malik Reviewed-by: Bertrand Drouvot Reviewed-by: Amit Kapila Reviewed-by: Ashutosh Sharma Discussion: https://www.postgresql.org/message-id/CAExHW5s6KntzUyUoMbKR5dgwRmdV2Ay_2+AnTgYGAzo=Qv61wA@mail.gmail.com --- contrib/test_decoding/expected/stats.out | 77 ++++++++++--------- contrib/test_decoding/sql/stats.sql | 16 ++-- contrib/test_decoding/t/001_repl_stats.pl | 22 ++++-- contrib/test_decoding/test_decoding.c | 2 + doc/src/sgml/logicaldecoding.sgml | 36 +++++++++ doc/src/sgml/monitoring.sgml | 70 +++++++++++++++-- src/backend/catalog/system_views.sql | 8 +- src/backend/replication/logical/logical.c | 28 ++++++- .../replication/logical/logicalfuncs.c | 12 +++ .../replication/logical/reorderbuffer.c | 3 +- src/backend/replication/pgoutput/pgoutput.c | 33 +++++++- src/backend/replication/walsender.c | 7 ++ src/backend/utils/activity/pgstat_replslot.c | 19 ++++- src/backend/utils/adt/pgstatfuncs.c | 34 ++++++-- src/include/catalog/pg_proc.dat | 6 +- src/include/pgstat.h | 8 +- src/include/replication/logical.h | 1 + src/include/replication/output_plugin.h | 13 ++++ src/include/replication/reorderbuffer.h | 1 + src/test/recovery/t/006_logical_decoding.pl | 12 +-- .../t/035_standby_logical_decoding.pl | 4 +- src/test/regress/expected/rules.out | 10 ++- src/test/subscription/t/001_rep_changes.pl | 11 +++ src/test/subscription/t/010_truncate.pl | 20 +++++ src/test/subscription/t/028_row_filter.pl | 11 +++ src/tools/pgindent/typedefs.list | 1 + 26 files changed, 375 insertions(+), 90 deletions(-) diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index 28da9123cc8b..0e5c5fa5b187 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush(); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | t | t | t - regression_slot_stats2 | t | t | t | t | t - regression_slot_stats3 | t | t | t | t | t +-- total_wal_txns may vary based on the background activity but plugin_sent_txns +-- should always be 1 since the background transactions are always skipped. +-- Filtered bytes would be set only when there's a change that was passed to the +-- plugin but was filtered out. Depending upon the background transactions, +-- filtered bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count +------------------------+------------+-------------+----------------+-----------------+------------------+------------+----------------+-------------------- + regression_slot_stats1 | t | t | t | t | 1 | t | t | t + regression_slot_stats2 | t | t | t | t | 1 | t | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t | t (3 rows) RESET logical_decoding_work_mem; @@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | f | f | t - regression_slot_stats2 | t | t | t | t | t - regression_slot_stats3 | t | t | t | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count +------------------------+------------+-------------+----------------+-----------------+------------------+------------+----------------+-------------------- + regression_slot_stats1 | t | t | f | f | | | | t + regression_slot_stats2 | t | t | t | t | 1 | t | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t | t (3 rows) -- reset stats for all slots @@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | f | f | t - regression_slot_stats2 | t | t | f | f | t - regression_slot_stats3 | t | t | f | f | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes | mem_exceeded_count +------------------------+------------+-------------+----------------+-----------------+------------------+-------------------+-----------------------+-------------------- + regression_slot_stats1 | t | t | f | f | | | | t + regression_slot_stats2 | t | t | f | f | | | | t + regression_slot_stats3 | t | t | f | f | | | | t (3 rows) -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) SELECT pg_stat_reset_replication_slot('do-not-exist'); ERROR: replication slot "do-not-exist" does not exist SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) -- spilling the xact @@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de BEGIN; -SELECT slot_name FROM pg_stat_replication_slots; - slot_name ------------------------- - regression_slot_stats1 - regression_slot_stats2 - regression_slot_stats3 +SELECT slot_name, plugin FROM pg_stat_replication_slots; + slot_name | plugin +------------------------+--------------- + regression_slot_stats1 | test_decoding + regression_slot_stats2 | test_decoding + regression_slot_stats3 | test_decoding (3 rows) -SELECT slot_name FROM pg_stat_replication_slots; - slot_name ------------------------- - regression_slot_stats1 - regression_slot_stats2 - regression_slot_stats3 +SELECT slot_name, plugin FROM pg_stat_replication_slots; + slot_name | plugin +------------------------+--------------- + regression_slot_stats1 | test_decoding + regression_slot_stats2 | test_decoding + regression_slot_stats3 | test_decoding (3 rows) COMMIT; diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 6661dbcb85c3..d6bf3cde8b11 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -15,16 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1'); SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1'); SELECT pg_stat_force_next_flush(); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + +-- total_wal_txns may vary based on the background activity but plugin_sent_txns +-- should always be 1 since the background transactions are always skipped. +-- Filtered bytes would be set only when there's a change that was passed to the +-- plugin but was filtered out. Depending upon the background transactions, +-- filtered bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; RESET logical_decoding_work_mem; -- reset stats for one slot, others should be unaffected SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; -- reset stats for all slots SELECT pg_stat_reset_replication_slot(NULL); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); @@ -46,8 +52,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de BEGIN; -SELECT slot_name FROM pg_stat_replication_slots; -SELECT slot_name FROM pg_stat_replication_slots; +SELECT slot_name, plugin FROM pg_stat_replication_slots; +SELECT slot_name, plugin FROM pg_stat_replication_slots; COMMIT; diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 0de62edb7d84..756fc691ed6f 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -23,10 +23,16 @@ sub test_slot_stats my ($node, $expected, $msg) = @_; + # If there are background transactions which are filtered out by the output + # plugin, plugin_filtered_bytes may be greater than 0. But it's not + # guaranteed that such transactions would be present. my $result = $node->safe_psql( 'postgres', qq[ - SELECT slot_name, total_txns > 0 AS total_txn, - total_bytes > 0 AS total_bytes + SELECT slot_name, total_wal_txns > 0 AS total_txn, + total_wal_bytes > 0 AS total_bytes, + plugin_sent_txns > 0 AS sent_txn, + plugin_sent_bytes > 0 AS sent_bytes, + plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name]); is($result, $expected, $msg); @@ -65,7 +71,7 @@ sub test_slot_stats 'postgres', qq[ SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots WHERE slot_name ~ 'regression_slot' - AND total_txns > 0 AND total_bytes > 0; + AND total_wal_txns > 0 AND total_wal_bytes > 0; ]) or die "Timed out while waiting for statistics to be updated"; # Test to drop one of the replication slot and verify replication statistics data is @@ -80,9 +86,9 @@ sub test_slot_stats # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t -regression_slot3|t|t), + qq(regression_slot1|t|t|t|t|t +regression_slot2|t|t|t|t|t +regression_slot3|t|t|t|t|t), 'check replication statistics are updated'); # Test to remove one of the replication slots and adjust @@ -104,8 +110,8 @@ sub test_slot_stats # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t), + qq(regression_slot1|t|t|t|t|t +regression_slot2|t|t|t|t|t), 'check replication statistics after removing the slot file'); # cleanup diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 36e77c69e1c1..d06f6c3f92b4 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -173,6 +173,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->only_local = false; ctx->output_plugin_private = data; + ctx->stats = palloc0(sizeof(OutputPluginStats)); opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; opt->receive_rewrites = false; @@ -310,6 +311,7 @@ static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) { OutputPluginPrepareWrite(ctx, last_write); + ctx->stats->sentTxns++; if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); else diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index b803a819cf1f..0bf9ffbfd286 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -938,6 +938,42 @@ typedef struct OutputPluginOptions needs to have a state, it can use ctx->output_plugin_private to store it. + + + The startup callback may initialize ctx->stats, + typically as follows, if it chooses to maintain and report statistics + about its activity in pg_stat_replication_slots. + +ctx->stats = palloc0(sizeof(OutputPluginStats)); + + where OutputPluginStats is defined as follows: + +typedef struct OutputPluginStats +{ + int64 sentTxns; + int64 sentBytes; + int64 filteredBytes; +} OutputPluginStats; + + sentTxns is the number of transactions sent downstream + by the output plugin. sentBytes is the amount of data, + in bytes, sent downstream by the output plugin. + filteredBytes is the size of changes, in bytes, that + are filtered out by the output plugin. + OutputPluginWrite will update + sentBytes if ctx->stats is + initialized by the output plugin. Function + ReorderBufferChangeSize may be used to find the size of + filtered ReorderBufferChange. + + + + + Once a plugin starts reporting and maintaining these statistics, it is + not expected that they will discontinue doing so. If they do, the result + may be misleading because of the cumulative nature of these statistics. + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index f3bf527d5b4b..7f30094b2286 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1549,6 +1549,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + plugin text + + + The base name of the shared object containing the output plugin this + logical slot is using. This column is same as the one in + pg_replication_slots. + + + spill_txns bigint @@ -1637,19 +1648,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage - total_txns bigint + total_wal_txns bigint - Number of decoded transactions sent to the decoding output plugin for - this slot. This counts top-level transactions only, and is not incremented - for subtransactions. Note that this includes the transactions that are - streamed and/or spilled. + Number of decoded transactions from WAL sent to the decoding output + plugin for this slot. This counts top-level transactions only, and is + not incremented for subtransactions. Note that this includes the + transactions that are streamed and/or spilled. - total_bytesbigint + total_wal_bytesbigint Amount of transaction data decoded for sending transactions to the @@ -1659,6 +1670,53 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + plugin_filtered_bytes bigint + + + Amount of changes, from total_wal_bytes, filtered + out by the output plugin and not sent downstream. Please note that it + does not include the changes filtered before a change is sent to + the output plugin, e.g. the changes filtered by origin. The counter is + maintained by the output plugin mentioned in + plugin. It is NULL when statistics is not + initialized or immediately after a reset or when not maintained by the + output plugin. + + + + + + plugin_sent_txns bigint + + + Number of decoded transactions sent downstream for this slot. This + counts top-level transactions only, and is not incremented for + subtransactions. These transactions are subset of transactions sent to + the decoding plugin. Hence this count is expected to be less than or + equal to total_wal_txns. The counter is maintained + by the output plugin mentioned in plugin. It + is NULL when statistics is not initialized or immediately after a reset or + when not maintained by the output plugin. + + + + + + plugin_sent_bytesbigint + + + Amount of transaction changes sent downstream for this slot by the + output plugin after applying filtering and converting into its output + format. The counter is maintained by the output plugin mentioned in + plugin. It is NULL when statistics is not + initialized or immediately after a reset or when not maintained by the + output plugin. + + + + stats_reset timestamp with time zone diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index dec8df4f8ee6..defca1cf9acf 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1067,6 +1067,7 @@ CREATE VIEW pg_replication_slots AS CREATE VIEW pg_stat_replication_slots AS SELECT s.slot_name, + r.plugin, s.spill_txns, s.spill_count, s.spill_bytes, @@ -1074,8 +1075,11 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_count, s.stream_bytes, s.mem_exceeded_count, - s.total_txns, - s.total_bytes, + s.total_wal_txns, + s.total_wal_bytes, + s.plugin_filtered_bytes, + s.plugin_sent_txns, + s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots as r, LATERAL pg_stat_get_replication_slot(slot_name) as s diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 93ed2eb368e1..f0810f05153a 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1952,6 +1952,7 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; + OutputPluginStats *stats = ctx->stats; PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ @@ -1959,7 +1960,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->memExceededCount <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " (%s) %" PRId64 " %" PRId64 " %" PRId64, rb, rb->spillTxns, rb->spillCount, @@ -1969,7 +1970,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamBytes, rb->memExceededCount, rb->totalTxns, - rb->totalBytes); + rb->totalBytes, + stats ? "plugin has stats" : "plugin has no stats", + stats ? stats->sentTxns : 0, + stats ? stats->sentBytes : 0, + stats ? stats->filteredBytes : 0); repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_count = rb->spillCount; @@ -1978,8 +1983,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_bytes = rb->streamBytes; repSlotStat.mem_exceeded_count = rb->memExceededCount; - repSlotStat.total_txns = rb->totalTxns; - repSlotStat.total_bytes = rb->totalBytes; + repSlotStat.total_wal_txns = rb->totalTxns; + repSlotStat.total_wal_bytes = rb->totalBytes; + if (stats) + { + repSlotStat.plugin_has_stats = true; + repSlotStat.plugin_sent_txns = stats->sentTxns; + repSlotStat.plugin_sent_bytes = stats->sentBytes; + repSlotStat.plugin_filtered_bytes = stats->filteredBytes; + } + else + repSlotStat.plugin_has_stats = false; pgstat_report_replslot(ctx->slot, &repSlotStat); @@ -1992,6 +2006,12 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->memExceededCount = 0; rb->totalTxns = 0; rb->totalBytes = 0; + if (stats) + { + stats->sentTxns = 0; + stats->sentBytes = 0; + stats->filteredBytes = 0; + } } /* diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 25f890ddeeda..55e02e7ee217 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -65,6 +65,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi Datum values[3]; bool nulls[3]; DecodingOutputState *p; + int64 sentBytes = 0; /* SQL Datums can only be of a limited length... */ if (ctx->out->len > MaxAllocSize - VARHDRSZ) @@ -74,7 +75,9 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi memset(nulls, 0, sizeof(nulls)); values[0] = LSNGetDatum(lsn); + sentBytes += sizeof(XLogRecPtr); values[1] = TransactionIdGetDatum(xid); + sentBytes += sizeof(TransactionId); /* * Assert ctx->out is in database encoding when we're writing textual @@ -87,8 +90,17 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi /* ick, but cstring_to_text_with_len works for bytea perfectly fine */ values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len)); + sentBytes += ctx->out->len; tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); + + /* + * If output plugin has chosen to maintain its stats, update the amount of + * data sent downstream. + */ + if (ctx->stats) + ctx->stats->sentBytes += sentBytes; + p->returned_rows++; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b57aef9916de..d336ef3a51f0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t * memory accounting * --------------------------------------- */ -static Size ReorderBufferChangeSize(ReorderBufferChange *change); static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, @@ -4458,7 +4457,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Size of a change in memory. */ -static Size +Size ReorderBufferChangeSize(ReorderBufferChange *change) { Size sz = sizeof(ReorderBufferChange); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 942e1abdb584..367ba9efab39 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -473,6 +473,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, MemoryContextRegisterResetCallback(ctx->context, mcallback); ctx->output_plugin_private = data; + ctx->stats = palloc0(sizeof(OutputPluginStats)); /* This plugin uses binary protocol. */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; @@ -614,6 +615,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); txndata->sent_begin_txn = true; + ctx->stats->sentTxns++; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -1492,7 +1494,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *new_slot = NULL; if (!is_publishable_relation(relation)) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } /* * Remember the xid for the change in streaming mode. We need to send xid @@ -1510,15 +1515,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } break; case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } /* * This is only possible if deletes are allowed even when replica @@ -1528,6 +1542,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!change->data.tp.oldtuple) { elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; } break; @@ -1583,7 +1598,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * of the row filter for old and new tuple. */ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action)) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); goto cleanup; + } /* * Send BEGIN if we haven't yet. @@ -1603,7 +1621,11 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); - /* Send the data */ + /* + * Send the data. Even if we end up filtering some columns while sending + * the message, we won't filter the change, as a whole. Hence we don't + * increment filteredBytes. + */ switch (action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -1710,7 +1732,16 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, change->data.truncate.cascade, change->data.truncate.restart_seqs); OutputPluginWrite(ctx, true); + + /* + * Even if we filtered out some relations, we still send a TRUNCATE + * message for the remaining relations. Since the change, as a whole, + * is not filtered out we don't increment filteredBytes. + */ } + else + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); + MemoryContextSwitchTo(old); MemoryContextReset(data->context); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 548eafa7a732..b0a5d4da7a79 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1587,6 +1587,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); + /* + * If output plugin maintains statistics, update the amount of data sent + * downstream. + */ + if (ctx->stats) + ctx->stats->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */ + CHECK_FOR_INTERRUPTS(); /* Try to flush pending output to the client */ diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index d210c261ac65..42ca13bd76a2 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -88,6 +88,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re /* Update the replication slot statistics */ #define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld +#define REPLSLOT_SET_TO_ZERO(fld) statent->fld = 0 REPLSLOT_ACC(spill_txns); REPLSLOT_ACC(spill_count); REPLSLOT_ACC(spill_bytes); @@ -95,9 +96,23 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(stream_count); REPLSLOT_ACC(stream_bytes); REPLSLOT_ACC(mem_exceeded_count); - REPLSLOT_ACC(total_txns); - REPLSLOT_ACC(total_bytes); + REPLSLOT_ACC(total_wal_txns); + REPLSLOT_ACC(total_wal_bytes); + statent->plugin_has_stats = repSlotStat->plugin_has_stats; + if (repSlotStat->plugin_has_stats) + { + REPLSLOT_ACC(plugin_sent_txns); + REPLSLOT_ACC(plugin_sent_bytes); + REPLSLOT_ACC(plugin_filtered_bytes); + } + else + { + REPLSLOT_SET_TO_ZERO(plugin_sent_txns); + REPLSLOT_SET_TO_ZERO(plugin_sent_bytes); + REPLSLOT_SET_TO_ZERO(plugin_filtered_bytes); + } #undef REPLSLOT_ACC +#undef REPLSLOT_SET_TO_ZERO pgstat_unlock_entry(entry_ref); } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index a710508979e4..672b01a246dd 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2129,7 +2129,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 11 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 14 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2156,11 +2156,17 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_wal_txns", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_wal_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "plugin_filtered_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "plugin_sent_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "plugin_sent_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 14, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2184,13 +2190,25 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[5] = Int64GetDatum(slotent->stream_count); values[6] = Int64GetDatum(slotent->stream_bytes); values[7] = Int64GetDatum(slotent->mem_exceeded_count); - values[8] = Int64GetDatum(slotent->total_txns); - values[9] = Int64GetDatum(slotent->total_bytes); + values[8] = Int64GetDatum(slotent->total_wal_txns); + values[9] = Int64GetDatum(slotent->total_wal_bytes); + if (slotent->plugin_has_stats) + { + values[10] = Int64GetDatum(slotent->plugin_filtered_bytes); + values[11] = Int64GetDatum(slotent->plugin_sent_txns); + values[12] = Int64GetDatum(slotent->plugin_sent_bytes); + } + else + { + nulls[10] = true; + nulls[11] = true; + nulls[12] = true; + } if (slotent->stat_reset_timestamp == 0) - nulls[10] = true; + nulls[13] = true; else - values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[13] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 34b7fddb0e7a..01f88e911fc4 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5691,9 +5691,9 @@ { oid => '6169', descr => 'statistics: information about replication slot', proname => 'pg_stat_get_replication_slot', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'text', - proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_wal_txns,total_wal_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, { oid => '6230', descr => 'statistics: check if a stats object exists', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7ae503e71a27..427cf55d4b63 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -396,8 +396,12 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter stream_count; PgStat_Counter stream_bytes; PgStat_Counter mem_exceeded_count; - PgStat_Counter total_txns; - PgStat_Counter total_bytes; + PgStat_Counter total_wal_txns; + PgStat_Counter total_wal_bytes; + bool plugin_has_stats; + PgStat_Counter plugin_sent_txns; + PgStat_Counter plugin_sent_bytes; + PgStat_Counter plugin_filtered_bytes; TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 2e562bee5a9c..010c59f783d6 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -52,6 +52,7 @@ typedef struct LogicalDecodingContext OutputPluginCallbacks callbacks; OutputPluginOptions options; + OutputPluginStats *stats; /* * User specified options diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 8d4d5b71887d..4cc939e6c982 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -29,6 +29,19 @@ typedef struct OutputPluginOptions bool receive_rewrites; } OutputPluginOptions; +/* + * Statistics about the transactions decoded and sent downstream by the output + * plugin. + */ +typedef struct OutputPluginStats +{ + int64 sentTxns; /* number of transactions decoded and sent + * downstream */ + int64 sentBytes; /* amount of data decoded and sent downstream */ + int64 filteredBytes; /* amount of data from reorder buffer that was + * filtered out by the output plugin */ +} OutputPluginStats; + /* * Type of the shared library symbol _PG_output_plugin_init that is looked up * when loading an output plugin shared library. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 3cbe106a3c78..382eba66a765 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -718,6 +718,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids); extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert); +extern Size ReorderBufferChangeSize(ReorderBufferChange *change); extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 96b70b84d5e1..49bfe679249b 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -214,10 +214,10 @@ # Stats exist for stats test slot 1 is( $node_primary->safe_psql( 'postgres', - qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT total_wal_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) + qq(t|t|t), + qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) ); # Do reset of stats for stats test slot 1 @@ -235,10 +235,10 @@ is( $node_primary->safe_psql( 'postgres', - qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT stats_reset > '$reset1'::timestamptz, total_wal_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.) + qq(t|t|t), + qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_wal_bytes and plugin_sent_bytes were set to 0 and NULL respectively.) ); # Check that test slot 2 has NULL in reset timestamp diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index ebe2fae17898..5f4df30d65ae 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -577,7 +577,7 @@ sub wait_until_vacuum_can_remove qq[INSERT INTO decoding_test(x,y) SELECT 100,'100';]); $node_standby->poll_query_until('testdb', - qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] + qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] ) or die "replication slot stats of vacuum_full_activeslot not updated"; # This should trigger the conflict @@ -605,7 +605,7 @@ sub wait_until_vacuum_can_remove # Ensure that replication slot stats are not removed after invalidation. is( $node_standby->safe_psql( 'testdb', - qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] + qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] ), 't', 'replication slot stats not removed after invalidation'); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2bf968ae3d37..7e12b9712696 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2142,6 +2142,7 @@ pg_stat_replication| SELECT s.pid, JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, + r.plugin, s.spill_txns, s.spill_count, s.spill_bytes, @@ -2149,11 +2150,14 @@ pg_stat_replication_slots| SELECT s.slot_name, s.stream_count, s.stream_bytes, s.mem_exceeded_count, - s.total_txns, - s.total_bytes, + s.total_wal_txns, + s.total_wal_bytes, + s.plugin_filtered_bytes, + s.plugin_sent_txns, + s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots r, - LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset) + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_wal_txns, total_wal_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset) WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT name, blks_zeroed, diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 430c1246d14c..7f37b6fe6c6e 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -124,6 +124,9 @@ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); is($result, qq(1002), 'check initial data was copied to subscriber'); +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_ins SELECT generate_series(1,50)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); @@ -157,6 +160,14 @@ $node_publisher->wait_for_catchup('tap_sub'); +# Verify that plugin_filtered_bytes increases due to filtered update and delete +# operations on tab_ins. We cannot test the exact value since it may include +# changes from other concurrent transactions. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after DML filtering'); + $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index 3d16c2a800de..c41ad317221d 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -69,6 +69,9 @@ # Wait for initial sync of all subscriptions $node_subscriber->wait_for_subscription_sync; +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); + # insert data to truncate $node_subscriber->safe_psql('postgres', @@ -98,6 +101,16 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')"); is($result, qq(101), 'truncate restarted identities'); +# All the DMLs above happen on tables that are subscribed to by sub1 and not +# sub2. plugin_filtered_bytes should get incremented for replication slot +# corresponding to the subscription sub2. We can not test the exact value of +# plugin_filtered_bytes because the counter is affected by background activity. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after publication level filtering'); +$initial_filtered_bytes = $final_filtered_bytes; + # test publication that does not replicate truncate $node_subscriber->safe_psql('postgres', @@ -107,6 +120,13 @@ $node_publisher->wait_for_catchup('sub2'); +# Truncate changes are filtered out at publication level itself. Make sure that +# the plugin_filtered_bytes is incremented. +$final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after truncate filtering'); + $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab2"); is($result, qq(3|1|3), 'truncate not replicated'); diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl index e2c836700535..039bf5ff5a02 100644 --- a/src/test/subscription/t/028_row_filter.pl +++ b/src/test/subscription/t/028_row_filter.pl @@ -579,6 +579,9 @@ # commands are for testing normal logical replication behavior. # # test row filter (INSERT, UPDATE, DELETE) +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')"); $node_publisher->safe_psql('postgres', @@ -612,6 +615,14 @@ $node_publisher->wait_for_catchup($appname); +# The changes which do not pass the row filter will be filtered. Make sure that +# the plugin_filtered_bytes reflects that. We can not test the exact value of +# plugin_filtered_bytes since it is affected by background activity. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after row filtering'); + # Check expected replicated rows for tab_rowfilter_2 # tap_pub_1 filter is: (c % 2 = 0) # tap_pub_2 filter is: (c % 3 = 0) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 432509277c98..f2d9d7876489 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1835,6 +1835,7 @@ OuterJoinClauseInfo OutputPluginCallbacks OutputPluginOptions OutputPluginOutputType +OutputPluginStats OverridingKind PACE_HEADER PACL