diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index 28da9123cc8b..0e5c5fa5b187 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush(); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | t | t | t - regression_slot_stats2 | t | t | t | t | t - regression_slot_stats3 | t | t | t | t | t +-- total_wal_txns may vary based on the background activity but plugin_sent_txns +-- should always be 1 since the background transactions are always skipped. +-- Filtered bytes would be set only when there's a change that was passed to the +-- plugin but was filtered out. Depending upon the background transactions, +-- filtered bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count +------------------------+------------+-------------+----------------+-----------------+------------------+------------+----------------+-------------------- + regression_slot_stats1 | t | t | t | t | 1 | t | t | t + regression_slot_stats2 | t | t | t | t | 1 | t | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t | t (3 rows) RESET logical_decoding_work_mem; @@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | f | f | t - regression_slot_stats2 | t | t | t | t | t - regression_slot_stats3 | t | t | t | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count +------------------------+------------+-------------+----------------+-----------------+------------------+------------+----------------+-------------------- + regression_slot_stats1 | t | t | f | f | | | | t + regression_slot_stats2 | t | t | t | t | 1 | t | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t | t (3 rows) -- reset stats for all slots @@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | f | f | t - regression_slot_stats2 | t | t | f | f | t - regression_slot_stats3 | t | t | f | f | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes | mem_exceeded_count +------------------------+------------+-------------+----------------+-----------------+------------------+-------------------+-----------------------+-------------------- + regression_slot_stats1 | t | t | f | f | | | | t + regression_slot_stats2 | t | t | f | f | | | | t + regression_slot_stats3 | t | t | f | f | | | | t (3 rows) -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) SELECT pg_stat_reset_replication_slot('do-not-exist'); ERROR: replication slot "do-not-exist" does not exist SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) -- spilling the xact @@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de BEGIN; -SELECT slot_name FROM pg_stat_replication_slots; - slot_name ------------------------- - regression_slot_stats1 - regression_slot_stats2 - regression_slot_stats3 +SELECT slot_name, plugin FROM pg_stat_replication_slots; + slot_name | plugin +------------------------+--------------- + regression_slot_stats1 | test_decoding + regression_slot_stats2 | test_decoding + regression_slot_stats3 | test_decoding (3 rows) -SELECT slot_name FROM pg_stat_replication_slots; - slot_name ------------------------- - regression_slot_stats1 - regression_slot_stats2 - regression_slot_stats3 +SELECT slot_name, plugin FROM pg_stat_replication_slots; + slot_name | plugin +------------------------+--------------- + regression_slot_stats1 | test_decoding + regression_slot_stats2 | test_decoding + regression_slot_stats3 | test_decoding (3 rows) COMMIT; diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 6661dbcb85c3..d6bf3cde8b11 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -15,16 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1'); SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1'); SELECT pg_stat_force_next_flush(); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + +-- total_wal_txns may vary based on the background activity but plugin_sent_txns +-- should always be 1 since the background transactions are always skipped. +-- Filtered bytes would be set only when there's a change that was passed to the +-- plugin but was filtered out. Depending upon the background transactions, +-- filtered bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; RESET logical_decoding_work_mem; -- reset stats for one slot, others should be unaffected SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; -- reset stats for all slots SELECT pg_stat_reset_replication_slot(NULL); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); @@ -46,8 +52,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de BEGIN; -SELECT slot_name FROM pg_stat_replication_slots; -SELECT slot_name FROM pg_stat_replication_slots; +SELECT slot_name, plugin FROM pg_stat_replication_slots; +SELECT slot_name, plugin FROM pg_stat_replication_slots; COMMIT; diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 0de62edb7d84..756fc691ed6f 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -23,10 +23,16 @@ sub test_slot_stats my ($node, $expected, $msg) = @_; + # If there are background transactions which are filtered out by the output + # plugin, plugin_filtered_bytes may be greater than 0. But it's not + # guaranteed that such transactions would be present. my $result = $node->safe_psql( 'postgres', qq[ - SELECT slot_name, total_txns > 0 AS total_txn, - total_bytes > 0 AS total_bytes + SELECT slot_name, total_wal_txns > 0 AS total_txn, + total_wal_bytes > 0 AS total_bytes, + plugin_sent_txns > 0 AS sent_txn, + plugin_sent_bytes > 0 AS sent_bytes, + plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name]); is($result, $expected, $msg); @@ -65,7 +71,7 @@ sub test_slot_stats 'postgres', qq[ SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots WHERE slot_name ~ 'regression_slot' - AND total_txns > 0 AND total_bytes > 0; + AND total_wal_txns > 0 AND total_wal_bytes > 0; ]) or die "Timed out while waiting for statistics to be updated"; # Test to drop one of the replication slot and verify replication statistics data is @@ -80,9 +86,9 @@ sub test_slot_stats # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t -regression_slot3|t|t), + qq(regression_slot1|t|t|t|t|t +regression_slot2|t|t|t|t|t +regression_slot3|t|t|t|t|t), 'check replication statistics are updated'); # Test to remove one of the replication slots and adjust @@ -104,8 +110,8 @@ sub test_slot_stats # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t), + qq(regression_slot1|t|t|t|t|t +regression_slot2|t|t|t|t|t), 'check replication statistics after removing the slot file'); # cleanup diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 36e77c69e1c1..d06f6c3f92b4 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -173,6 +173,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->only_local = false; ctx->output_plugin_private = data; + ctx->stats = palloc0(sizeof(OutputPluginStats)); opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; opt->receive_rewrites = false; @@ -310,6 +311,7 @@ static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) { OutputPluginPrepareWrite(ctx, last_write); + ctx->stats->sentTxns++; if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); else diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index b803a819cf1f..0bf9ffbfd286 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -938,6 +938,42 @@ typedef struct OutputPluginOptions needs to have a state, it can use ctx->output_plugin_private to store it. + + + The startup callback may initialize ctx->stats, + typically as follows, if it chooses to maintain and report statistics + about its activity in pg_stat_replication_slots. + +ctx->stats = palloc0(sizeof(OutputPluginStats)); + + where OutputPluginStats is defined as follows: + +typedef struct OutputPluginStats +{ + int64 sentTxns; + int64 sentBytes; + int64 filteredBytes; +} OutputPluginStats; + + sentTxns is the number of transactions sent downstream + by the output plugin. sentBytes is the amount of data, + in bytes, sent downstream by the output plugin. + filteredBytes is the size of changes, in bytes, that + are filtered out by the output plugin. + OutputPluginWrite will update + sentBytes if ctx->stats is + initialized by the output plugin. Function + ReorderBufferChangeSize may be used to find the size of + filtered ReorderBufferChange. + + + + + Once a plugin starts reporting and maintaining these statistics, it is + not expected that they will discontinue doing so. If they do, the result + may be misleading because of the cumulative nature of these statistics. + + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index f3bf527d5b4b..7f30094b2286 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1549,6 +1549,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + plugin text + + + The base name of the shared object containing the output plugin this + logical slot is using. This column is same as the one in + pg_replication_slots. + + + spill_txns bigint @@ -1637,19 +1648,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage - total_txns bigint + total_wal_txns bigint - Number of decoded transactions sent to the decoding output plugin for - this slot. This counts top-level transactions only, and is not incremented - for subtransactions. Note that this includes the transactions that are - streamed and/or spilled. + Number of decoded transactions from WAL sent to the decoding output + plugin for this slot. This counts top-level transactions only, and is + not incremented for subtransactions. Note that this includes the + transactions that are streamed and/or spilled. - total_bytesbigint + total_wal_bytesbigint Amount of transaction data decoded for sending transactions to the @@ -1659,6 +1670,53 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + plugin_filtered_bytes bigint + + + Amount of changes, from total_wal_bytes, filtered + out by the output plugin and not sent downstream. Please note that it + does not include the changes filtered before a change is sent to + the output plugin, e.g. the changes filtered by origin. The counter is + maintained by the output plugin mentioned in + plugin. It is NULL when statistics is not + initialized or immediately after a reset or when not maintained by the + output plugin. + + + + + + plugin_sent_txns bigint + + + Number of decoded transactions sent downstream for this slot. This + counts top-level transactions only, and is not incremented for + subtransactions. These transactions are subset of transactions sent to + the decoding plugin. Hence this count is expected to be less than or + equal to total_wal_txns. The counter is maintained + by the output plugin mentioned in plugin. It + is NULL when statistics is not initialized or immediately after a reset or + when not maintained by the output plugin. + + + + + + plugin_sent_bytesbigint + + + Amount of transaction changes sent downstream for this slot by the + output plugin after applying filtering and converting into its output + format. The counter is maintained by the output plugin mentioned in + plugin. It is NULL when statistics is not + initialized or immediately after a reset or when not maintained by the + output plugin. + + + + stats_reset timestamp with time zone diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index dec8df4f8ee6..defca1cf9acf 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1067,6 +1067,7 @@ CREATE VIEW pg_replication_slots AS CREATE VIEW pg_stat_replication_slots AS SELECT s.slot_name, + r.plugin, s.spill_txns, s.spill_count, s.spill_bytes, @@ -1074,8 +1075,11 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_count, s.stream_bytes, s.mem_exceeded_count, - s.total_txns, - s.total_bytes, + s.total_wal_txns, + s.total_wal_bytes, + s.plugin_filtered_bytes, + s.plugin_sent_txns, + s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots as r, LATERAL pg_stat_get_replication_slot(slot_name) as s diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 93ed2eb368e1..f0810f05153a 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1952,6 +1952,7 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; + OutputPluginStats *stats = ctx->stats; PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ @@ -1959,7 +1960,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->memExceededCount <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " (%s) %" PRId64 " %" PRId64 " %" PRId64, rb, rb->spillTxns, rb->spillCount, @@ -1969,7 +1970,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamBytes, rb->memExceededCount, rb->totalTxns, - rb->totalBytes); + rb->totalBytes, + stats ? "plugin has stats" : "plugin has no stats", + stats ? stats->sentTxns : 0, + stats ? stats->sentBytes : 0, + stats ? stats->filteredBytes : 0); repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_count = rb->spillCount; @@ -1978,8 +1983,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_bytes = rb->streamBytes; repSlotStat.mem_exceeded_count = rb->memExceededCount; - repSlotStat.total_txns = rb->totalTxns; - repSlotStat.total_bytes = rb->totalBytes; + repSlotStat.total_wal_txns = rb->totalTxns; + repSlotStat.total_wal_bytes = rb->totalBytes; + if (stats) + { + repSlotStat.plugin_has_stats = true; + repSlotStat.plugin_sent_txns = stats->sentTxns; + repSlotStat.plugin_sent_bytes = stats->sentBytes; + repSlotStat.plugin_filtered_bytes = stats->filteredBytes; + } + else + repSlotStat.plugin_has_stats = false; pgstat_report_replslot(ctx->slot, &repSlotStat); @@ -1992,6 +2006,12 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->memExceededCount = 0; rb->totalTxns = 0; rb->totalBytes = 0; + if (stats) + { + stats->sentTxns = 0; + stats->sentBytes = 0; + stats->filteredBytes = 0; + } } /* diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 25f890ddeeda..55e02e7ee217 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -65,6 +65,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi Datum values[3]; bool nulls[3]; DecodingOutputState *p; + int64 sentBytes = 0; /* SQL Datums can only be of a limited length... */ if (ctx->out->len > MaxAllocSize - VARHDRSZ) @@ -74,7 +75,9 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi memset(nulls, 0, sizeof(nulls)); values[0] = LSNGetDatum(lsn); + sentBytes += sizeof(XLogRecPtr); values[1] = TransactionIdGetDatum(xid); + sentBytes += sizeof(TransactionId); /* * Assert ctx->out is in database encoding when we're writing textual @@ -87,8 +90,17 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi /* ick, but cstring_to_text_with_len works for bytea perfectly fine */ values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len)); + sentBytes += ctx->out->len; tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); + + /* + * If output plugin has chosen to maintain its stats, update the amount of + * data sent downstream. + */ + if (ctx->stats) + ctx->stats->sentBytes += sentBytes; + p->returned_rows++; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b57aef9916de..d336ef3a51f0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t * memory accounting * --------------------------------------- */ -static Size ReorderBufferChangeSize(ReorderBufferChange *change); static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, @@ -4458,7 +4457,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Size of a change in memory. */ -static Size +Size ReorderBufferChangeSize(ReorderBufferChange *change) { Size sz = sizeof(ReorderBufferChange); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 942e1abdb584..367ba9efab39 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -473,6 +473,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, MemoryContextRegisterResetCallback(ctx->context, mcallback); ctx->output_plugin_private = data; + ctx->stats = palloc0(sizeof(OutputPluginStats)); /* This plugin uses binary protocol. */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; @@ -614,6 +615,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); txndata->sent_begin_txn = true; + ctx->stats->sentTxns++; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -1492,7 +1494,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *new_slot = NULL; if (!is_publishable_relation(relation)) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } /* * Remember the xid for the change in streaming mode. We need to send xid @@ -1510,15 +1515,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } break; case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } /* * This is only possible if deletes are allowed even when replica @@ -1528,6 +1542,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!change->data.tp.oldtuple) { elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; } break; @@ -1583,7 +1598,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * of the row filter for old and new tuple. */ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action)) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); goto cleanup; + } /* * Send BEGIN if we haven't yet. @@ -1603,7 +1621,11 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); - /* Send the data */ + /* + * Send the data. Even if we end up filtering some columns while sending + * the message, we won't filter the change, as a whole. Hence we don't + * increment filteredBytes. + */ switch (action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -1710,7 +1732,16 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, change->data.truncate.cascade, change->data.truncate.restart_seqs); OutputPluginWrite(ctx, true); + + /* + * Even if we filtered out some relations, we still send a TRUNCATE + * message for the remaining relations. Since the change, as a whole, + * is not filtered out we don't increment filteredBytes. + */ } + else + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); + MemoryContextSwitchTo(old); MemoryContextReset(data->context); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 548eafa7a732..b0a5d4da7a79 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1587,6 +1587,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); + /* + * If output plugin maintains statistics, update the amount of data sent + * downstream. + */ + if (ctx->stats) + ctx->stats->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */ + CHECK_FOR_INTERRUPTS(); /* Try to flush pending output to the client */ diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index d210c261ac65..42ca13bd76a2 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -88,6 +88,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re /* Update the replication slot statistics */ #define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld +#define REPLSLOT_SET_TO_ZERO(fld) statent->fld = 0 REPLSLOT_ACC(spill_txns); REPLSLOT_ACC(spill_count); REPLSLOT_ACC(spill_bytes); @@ -95,9 +96,23 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(stream_count); REPLSLOT_ACC(stream_bytes); REPLSLOT_ACC(mem_exceeded_count); - REPLSLOT_ACC(total_txns); - REPLSLOT_ACC(total_bytes); + REPLSLOT_ACC(total_wal_txns); + REPLSLOT_ACC(total_wal_bytes); + statent->plugin_has_stats = repSlotStat->plugin_has_stats; + if (repSlotStat->plugin_has_stats) + { + REPLSLOT_ACC(plugin_sent_txns); + REPLSLOT_ACC(plugin_sent_bytes); + REPLSLOT_ACC(plugin_filtered_bytes); + } + else + { + REPLSLOT_SET_TO_ZERO(plugin_sent_txns); + REPLSLOT_SET_TO_ZERO(plugin_sent_bytes); + REPLSLOT_SET_TO_ZERO(plugin_filtered_bytes); + } #undef REPLSLOT_ACC +#undef REPLSLOT_SET_TO_ZERO pgstat_unlock_entry(entry_ref); } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index a710508979e4..672b01a246dd 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2129,7 +2129,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 11 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 14 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2156,11 +2156,17 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_wal_txns", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_wal_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "plugin_filtered_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "plugin_sent_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "plugin_sent_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 14, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2184,13 +2190,25 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[5] = Int64GetDatum(slotent->stream_count); values[6] = Int64GetDatum(slotent->stream_bytes); values[7] = Int64GetDatum(slotent->mem_exceeded_count); - values[8] = Int64GetDatum(slotent->total_txns); - values[9] = Int64GetDatum(slotent->total_bytes); + values[8] = Int64GetDatum(slotent->total_wal_txns); + values[9] = Int64GetDatum(slotent->total_wal_bytes); + if (slotent->plugin_has_stats) + { + values[10] = Int64GetDatum(slotent->plugin_filtered_bytes); + values[11] = Int64GetDatum(slotent->plugin_sent_txns); + values[12] = Int64GetDatum(slotent->plugin_sent_bytes); + } + else + { + nulls[10] = true; + nulls[11] = true; + nulls[12] = true; + } if (slotent->stat_reset_timestamp == 0) - nulls[10] = true; + nulls[13] = true; else - values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[13] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9121a382f76b..e78d4f0ab1e3 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5691,9 +5691,9 @@ { oid => '6169', descr => 'statistics: information about replication slot', proname => 'pg_stat_get_replication_slot', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'text', - proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_wal_txns,total_wal_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, { oid => '6230', descr => 'statistics: check if a stats object exists', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7ae503e71a27..427cf55d4b63 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -396,8 +396,12 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter stream_count; PgStat_Counter stream_bytes; PgStat_Counter mem_exceeded_count; - PgStat_Counter total_txns; - PgStat_Counter total_bytes; + PgStat_Counter total_wal_txns; + PgStat_Counter total_wal_bytes; + bool plugin_has_stats; + PgStat_Counter plugin_sent_txns; + PgStat_Counter plugin_sent_bytes; + PgStat_Counter plugin_filtered_bytes; TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 2e562bee5a9c..010c59f783d6 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -52,6 +52,7 @@ typedef struct LogicalDecodingContext OutputPluginCallbacks callbacks; OutputPluginOptions options; + OutputPluginStats *stats; /* * User specified options diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 8d4d5b71887d..4cc939e6c982 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -29,6 +29,19 @@ typedef struct OutputPluginOptions bool receive_rewrites; } OutputPluginOptions; +/* + * Statistics about the transactions decoded and sent downstream by the output + * plugin. + */ +typedef struct OutputPluginStats +{ + int64 sentTxns; /* number of transactions decoded and sent + * downstream */ + int64 sentBytes; /* amount of data decoded and sent downstream */ + int64 filteredBytes; /* amount of data from reorder buffer that was + * filtered out by the output plugin */ +} OutputPluginStats; + /* * Type of the shared library symbol _PG_output_plugin_init that is looked up * when loading an output plugin shared library. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 3cbe106a3c78..382eba66a765 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -718,6 +718,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids); extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert); +extern Size ReorderBufferChangeSize(ReorderBufferChange *change); extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 96b70b84d5e1..49bfe679249b 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -214,10 +214,10 @@ # Stats exist for stats test slot 1 is( $node_primary->safe_psql( 'postgres', - qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT total_wal_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) + qq(t|t|t), + qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) ); # Do reset of stats for stats test slot 1 @@ -235,10 +235,10 @@ is( $node_primary->safe_psql( 'postgres', - qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT stats_reset > '$reset1'::timestamptz, total_wal_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.) + qq(t|t|t), + qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_wal_bytes and plugin_sent_bytes were set to 0 and NULL respectively.) ); # Check that test slot 2 has NULL in reset timestamp diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index ebe2fae17898..5f4df30d65ae 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -577,7 +577,7 @@ sub wait_until_vacuum_can_remove qq[INSERT INTO decoding_test(x,y) SELECT 100,'100';]); $node_standby->poll_query_until('testdb', - qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] + qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] ) or die "replication slot stats of vacuum_full_activeslot not updated"; # This should trigger the conflict @@ -605,7 +605,7 @@ sub wait_until_vacuum_can_remove # Ensure that replication slot stats are not removed after invalidation. is( $node_standby->safe_psql( 'testdb', - qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] + qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] ), 't', 'replication slot stats not removed after invalidation'); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 77e25ca029e5..4bc5668f0fde 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2142,6 +2142,7 @@ pg_stat_replication| SELECT s.pid, JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, + r.plugin, s.spill_txns, s.spill_count, s.spill_bytes, @@ -2149,11 +2150,14 @@ pg_stat_replication_slots| SELECT s.slot_name, s.stream_count, s.stream_bytes, s.mem_exceeded_count, - s.total_txns, - s.total_bytes, + s.total_wal_txns, + s.total_wal_bytes, + s.plugin_filtered_bytes, + s.plugin_sent_txns, + s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots r, - LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset) + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_wal_txns, total_wal_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset) WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT name, blks_zeroed, diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 430c1246d14c..7f37b6fe6c6e 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -124,6 +124,9 @@ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); is($result, qq(1002), 'check initial data was copied to subscriber'); +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_ins SELECT generate_series(1,50)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); @@ -157,6 +160,14 @@ $node_publisher->wait_for_catchup('tap_sub'); +# Verify that plugin_filtered_bytes increases due to filtered update and delete +# operations on tab_ins. We cannot test the exact value since it may include +# changes from other concurrent transactions. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after DML filtering'); + $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index 3d16c2a800de..c41ad317221d 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -69,6 +69,9 @@ # Wait for initial sync of all subscriptions $node_subscriber->wait_for_subscription_sync; +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); + # insert data to truncate $node_subscriber->safe_psql('postgres', @@ -98,6 +101,16 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')"); is($result, qq(101), 'truncate restarted identities'); +# All the DMLs above happen on tables that are subscribed to by sub1 and not +# sub2. plugin_filtered_bytes should get incremented for replication slot +# corresponding to the subscription sub2. We can not test the exact value of +# plugin_filtered_bytes because the counter is affected by background activity. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after publication level filtering'); +$initial_filtered_bytes = $final_filtered_bytes; + # test publication that does not replicate truncate $node_subscriber->safe_psql('postgres', @@ -107,6 +120,13 @@ $node_publisher->wait_for_catchup('sub2'); +# Truncate changes are filtered out at publication level itself. Make sure that +# the plugin_filtered_bytes is incremented. +$final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after truncate filtering'); + $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab2"); is($result, qq(3|1|3), 'truncate not replicated'); diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl index e2c836700535..039bf5ff5a02 100644 --- a/src/test/subscription/t/028_row_filter.pl +++ b/src/test/subscription/t/028_row_filter.pl @@ -579,6 +579,9 @@ # commands are for testing normal logical replication behavior. # # test row filter (INSERT, UPDATE, DELETE) +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')"); $node_publisher->safe_psql('postgres', @@ -612,6 +615,14 @@ $node_publisher->wait_for_catchup($appname); +# The changes which do not pass the row filter will be filtered. Make sure that +# the plugin_filtered_bytes reflects that. We can not test the exact value of +# plugin_filtered_bytes since it is affected by background activity. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after row filtering'); + # Check expected replicated rows for tab_rowfilter_2 # tap_pub_1 filter is: (c % 2 = 0) # tap_pub_2 filter is: (c % 3 = 0) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 018b5919cf66..3f1206a59c86 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1833,6 +1833,7 @@ OuterJoinClauseInfo OutputPluginCallbacks OutputPluginOptions OutputPluginOutputType +OutputPluginStats OverridingKind PACE_HEADER PACL