Skip to content

Commit 1c8eba7

Browse files
author
Commitfest Bot
committed
[CF 5189] v9 - Expose the acquired_by parameter to the pg_replication_origin_session_setup function
This branch was automatically generated by a robot using patches from an email thread registered at: https://commitfest.postgresql.org/patch/5189 The branch will be overwritten each time a new patch version is posted to the thread, and also periodically to check for bitrot caused by changes on the master branch. Patch(es): https://www.postgresql.org/message-id/OSCPR01MB14966B68C2148C1BC462AA906F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com Author(s): Doruk Yılmaz
2 parents 2e66cae + 5730cbd commit 1c8eba7

File tree

10 files changed

+193
-9
lines changed

10 files changed

+193
-9
lines changed

contrib/test_decoding/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
99
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
1010
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
1111
twophase_snapshot slot_creation_error catalog_change_snapshot \
12-
skip_snapshot_restore invalidation_distribution
12+
skip_snapshot_restore invalidation_distribution repl_origin
1313

1414
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
1515
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
4+
step s0_setup: SELECT pg_replication_origin_session_setup('origin');
5+
pg_replication_origin_session_setup
6+
-----------------------------------
7+
8+
(1 row)
9+
10+
step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
11+
pg_replication_origin_session_is_setup
12+
--------------------------------------
13+
t
14+
(1 row)
15+
16+
step s1_setup:
17+
SELECT pg_replication_origin_session_setup('origin', pid)
18+
FROM pg_stat_activity
19+
WHERE application_name = 'isolation/repl_origin/s0';
20+
21+
pg_replication_origin_session_setup
22+
-----------------------------------
23+
24+
(1 row)
25+
26+
step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
27+
pg_replication_origin_session_is_setup
28+
--------------------------------------
29+
t
30+
(1 row)
31+
32+
step s0_add_message:
33+
SELECT 1
34+
FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
35+
36+
?column?
37+
--------
38+
1
39+
(1 row)
40+
41+
step s0_store_lsn:
42+
INSERT INTO local_lsn_store
43+
SELECT 0, local_lsn FROM pg_replication_origin_status;
44+
45+
step s1_add_message:
46+
SELECT 1
47+
FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
48+
49+
?column?
50+
--------
51+
1
52+
(1 row)
53+
54+
step s1_store_lsn:
55+
INSERT INTO local_lsn_store
56+
SELECT 1, local_lsn FROM pg_replication_origin_status;
57+
58+
step s0_compare:
59+
SELECT s0.lsn < s1.lsn
60+
FROM local_lsn_store as s0, local_lsn_store as s1
61+
WHERE s0.session = 0 AND s1.session = 1;
62+
63+
?column?
64+
--------
65+
t
66+
(1 row)
67+
68+
step s0_reset: SELECT pg_replication_origin_session_reset();
69+
pg_replication_origin_session_reset
70+
-----------------------------------
71+
72+
(1 row)
73+
74+
step s1_reset: SELECT pg_replication_origin_session_reset();
75+
pg_replication_origin_session_reset
76+
-----------------------------------
77+
78+
(1 row)
79+

contrib/test_decoding/expected/replorigin.out

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
4141
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
4242
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
4343
DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists.
44+
-- ensure inactive origin cannot be set as session one if pid is specified
45+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
46+
ERROR: replication origin with ID 1 is inactive but PID -1 was specified
4447
--ensure deletions work (once)
4548
SELECT pg_replication_origin_create('regress_test_decoding: temp');
4649
pg_replication_origin_create

contrib/test_decoding/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ tests += {
6464
'slot_creation_error',
6565
'skip_snapshot_restore',
6666
'invalidation_distribution',
67+
'repl_origin',
6768
],
6869
'regress_args': [
6970
'--temp-config', files('logical.conf'),
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Test multi-session replication origin manipulations; ensure local_lsn can be
2+
# updated by all attached sessions.
3+
4+
setup
5+
{
6+
SELECT pg_replication_origin_create('origin');
7+
CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
8+
}
9+
10+
teardown
11+
{
12+
SELECT pg_replication_origin_drop('origin');
13+
DROP TABLE local_lsn_store;
14+
}
15+
16+
session "s0"
17+
setup { SET synchronous_commit = on; }
18+
step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
19+
step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
20+
step "s0_add_message" {
21+
SELECT 1
22+
FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
23+
}
24+
step "s0_store_lsn" {
25+
INSERT INTO local_lsn_store
26+
SELECT 0, local_lsn FROM pg_replication_origin_status;
27+
}
28+
step "s0_compare" {
29+
SELECT s0.lsn < s1.lsn
30+
FROM local_lsn_store as s0, local_lsn_store as s1
31+
WHERE s0.session = 0 AND s1.session = 1;
32+
}
33+
step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
34+
35+
session "s1"
36+
setup { SET synchronous_commit = on; }
37+
step "s1_setup" {
38+
SELECT pg_replication_origin_session_setup('origin', pid)
39+
FROM pg_stat_activity
40+
WHERE application_name = 'isolation/repl_origin/s0';
41+
}
42+
step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
43+
step "s1_add_message" {
44+
SELECT 1
45+
FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
46+
}
47+
step "s1_store_lsn" {
48+
INSERT INTO local_lsn_store
49+
SELECT 1, local_lsn FROM pg_replication_origin_status;
50+
}
51+
step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
52+
53+
# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
54+
# commits a transaction and store the local_lsn of the replication origin.
55+
# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
56+
permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"

contrib/test_decoding/sql/replorigin.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
2626
-- ensure duplicate creations fail
2727
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
2828

29+
-- ensure inactive origin cannot be set as session one if pid is specified
30+
SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
31+
2932
--ensure deletions work (once)
3033
SELECT pg_replication_origin_create('regress_test_decoding: temp');
3134
SELECT pg_replication_origin_drop('regress_test_decoding: temp');

doc/src/sgml/func/func-admin.sgml

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1315,15 +1315,34 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
13151315
<indexterm>
13161316
<primary>pg_replication_origin_session_setup</primary>
13171317
</indexterm>
1318-
<function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
1318+
<function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
13191319
<returnvalue>void</returnvalue>
13201320
</para>
13211321
<para>
13221322
Marks the current session as replaying from the given
13231323
origin, allowing replay progress to be tracked.
13241324
Can only be used if no origin is currently selected.
13251325
Use <function>pg_replication_origin_session_reset</function> to undo.
1326-
</para></entry>
1326+
If multiple processes can safely use the same replication origin (for
1327+
example, parallel apply processes), the optional <parameter>pid</parameter>
1328+
parameter can be used to specify the process ID of the first process.
1329+
The first process must provide <parameter>pid</parameter> equals to
1330+
<literal>0</literal> and the other processes that share the same
1331+
replication origin should provide the process ID of the first process.
1332+
</para>
1333+
<caution>
1334+
<para>
1335+
When multiple processes share the same replication origin, it is critical
1336+
to maintain commit order to prevent data inconsistency. While processes
1337+
may send operations out of order, they must commit transactions in the
1338+
correct sequence to ensure proper replication consistency. The recommended workflow
1339+
for each worker is: set up the replication origin session with the first process's PID,
1340+
apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
1341+
with the LSN and commit timestamp before committing, then commit the
1342+
transaction only if everything succeeded.
1343+
</para>
1344+
</caution>
1345+
</entry>
13271346
</row>
13281347

13291348
<row>

src/backend/catalog/system_functions.sql

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
650650
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
651651
AS 'pg_stat_reset_slru';
652652

653+
CREATE OR REPLACE FUNCTION
654+
pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
655+
RETURNS void
656+
LANGUAGE INTERNAL
657+
STRICT VOLATILE PARALLEL UNSAFE
658+
AS 'pg_replication_origin_session_setup';
659+
653660
--
654661
-- The default permissions for functions mean that anyone can execute them.
655662
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
751758

752759
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
753760

754-
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
761+
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
755762

756763
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
757764

src/backend/replication/logical/origin.c

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
11671167
curstate->roident, curstate->acquired_by)));
11681168
}
11691169

1170+
else if (curstate->acquired_by != acquired_by)
1171+
{
1172+
ereport(ERROR,
1173+
(errcode(ERRCODE_OBJECT_IN_USE),
1174+
errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175+
node, acquired_by)));
1176+
}
1177+
11701178
/* ok, found slot */
11711179
session_replication_state = curstate;
11721180
break;
@@ -1181,6 +1189,13 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
11811189
errhint("Increase \"max_active_replication_origins\" and try again.")));
11821190
else if (session_replication_state == NULL)
11831191
{
1192+
/* The origin is not used but PID is specified */
1193+
if(acquired_by)
1194+
ereport(ERROR,
1195+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1196+
errmsg("replication origin with ID %d is inactive but PID %d was specified",
1197+
node, acquired_by)));
1198+
11841199
/* initialize new slot */
11851200
session_replication_state = &replication_states[free_slot];
11861201
Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
@@ -1193,9 +1208,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
11931208

11941209
if (acquired_by == 0)
11951210
session_replication_state->acquired_by = MyProcPid;
1196-
else if (session_replication_state->acquired_by != acquired_by)
1197-
elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1198-
node, acquired_by);
1211+
else
1212+
Assert(session_replication_state->acquired_by == acquired_by);
11991213

12001214
LWLockRelease(ReplicationOriginLock);
12011215

@@ -1374,12 +1388,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
13741388
{
13751389
char *name;
13761390
RepOriginId origin;
1391+
int pid;
13771392

13781393
replorigin_check_prerequisites(true, false);
13791394

13801395
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
13811396
origin = replorigin_by_name(name, false);
1382-
replorigin_session_setup(origin, 0);
1397+
pid = PG_GETARG_INT32(1);
1398+
replorigin_session_setup(origin, pid);
13831399

13841400
replorigin_session_origin = origin;
13851401

src/include/catalog/pg_proc.dat

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12235,7 +12235,7 @@
1223512235
{ oid => '6006',
1223612236
descr => 'configure session to maintain replication progress tracking for the passed in origin',
1223712237
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
12238-
proparallel => 'u', prorettype => 'void', proargtypes => 'text',
12238+
proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
1223912239
prosrc => 'pg_replication_origin_session_setup' },
1224012240

1224112241
{ oid => '6007', descr => 'teardown configured replication progress tracking',

0 commit comments

Comments
 (0)