@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
5858 /* Remaining fields are invalid when conn is NULL: */
5959 int xact_depth ; /* 0 = no xact open, 1 = main xact open, 2 =
6060 * one level of subxact open, etc */
61+ bool xact_read_only ; /* xact r/o state */
6162 bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
6263 bool have_error ; /* have any subxacts aborted in this xact? */
6364 bool changing_xact_state ; /* xact state change in process */
@@ -84,6 +85,12 @@ static unsigned int prep_stmt_number = 0;
8485/* tracks whether any work is needed in callback functions */
8586static bool xact_got_connection = false;
8687
88+ /*
89+ * tracks the nesting level of the topmost read-only transaction determined
90+ * by GetTopReadOnlyTransactionNestLevel()
91+ */
92+ static int top_read_only_level = 0 ;
93+
8794/* custom wait event values, retrieved from shared memory */
8895static uint32 pgfdw_we_cleanup_result = 0 ;
8996static uint32 pgfdw_we_connect = 0 ;
@@ -374,6 +381,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
374381
375382 /* Reset all transient state fields, to be sure all are clean */
376383 entry -> xact_depth = 0 ;
384+ entry -> xact_read_only = false;
377385 entry -> have_prep_stmt = false;
378386 entry -> have_error = false;
379387 entry -> changing_xact_state = false;
@@ -848,29 +856,81 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
848856 * those scans. A disadvantage is that we can't provide sane emulation of
849857 * READ COMMITTED behavior --- it would be nice if we had some other way to
850858 * control which remote queries share a snapshot.
859+ *
860+ * Note also that we always start the remote transaction with the same
861+ * read/write and deferrable properties as the local transaction, and start
862+ * the remote subtransaction with the same read/write property as the local
863+ * subtransaction.
851864 */
852865static void
853866begin_remote_xact (ConnCacheEntry * entry )
854867{
855868 int curlevel = GetCurrentTransactionNestLevel ();
856869
857- /* Start main transaction if we haven't yet */
870+ /*
871+ * Set the nesting level of the topmost read-only transaction if the
872+ * current transaction is read-only and we haven't yet. Once it's set,
873+ * it's retained until that transaction is committed/aborted, and then
874+ * reset (see pgfdw_xact_callback and pgfdw_subxact_callback).
875+ */
876+ if (XactReadOnly )
877+ {
878+ if (top_read_only_level == 0 )
879+ top_read_only_level = GetTopReadOnlyTransactionNestLevel ();
880+ Assert (top_read_only_level > 0 );
881+ }
882+ else
883+ Assert (top_read_only_level == 0 );
884+
885+ /*
886+ * Start main transaction if we haven't yet; otherwise, change the
887+ * already-started remote transaction/subtransaction to read-only if the
888+ * local transaction/subtransaction have been done so after starting them
889+ * and we haven't yet.
890+ */
858891 if (entry -> xact_depth <= 0 )
859892 {
860- const char * sql ;
893+ StringInfoData sql ;
894+ bool ro = (top_read_only_level == 1 );
861895
862896 elog (DEBUG3 , "starting remote transaction on connection %p" ,
863897 entry -> conn );
864898
899+ initStringInfo (& sql );
900+ appendStringInfoString (& sql , "START TRANSACTION ISOLATION LEVEL " );
865901 if (IsolationIsSerializable ())
866- sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE" ;
902+ appendStringInfoString ( & sql , " SERIALIZABLE") ;
867903 else
868- sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ" ;
904+ appendStringInfoString (& sql , "REPEATABLE READ" );
905+ if (ro )
906+ appendStringInfoString (& sql , " READ ONLY" );
907+ if (XactDeferrable )
908+ appendStringInfoString (& sql , " DEFERRABLE" );
869909 entry -> changing_xact_state = true;
870- do_sql_command (entry -> conn , sql );
910+ do_sql_command (entry -> conn , sql . data );
871911 entry -> xact_depth = 1 ;
912+ if (ro )
913+ {
914+ Assert (!entry -> xact_read_only );
915+ entry -> xact_read_only = true;
916+ }
872917 entry -> changing_xact_state = false;
873918 }
919+ else if (!entry -> xact_read_only )
920+ {
921+ Assert (top_read_only_level == 0 ||
922+ entry -> xact_depth <= top_read_only_level );
923+ if (entry -> xact_depth == top_read_only_level )
924+ {
925+ entry -> changing_xact_state = true;
926+ do_sql_command (entry -> conn , "SET transaction_read_only = on" );
927+ entry -> xact_read_only = true;
928+ entry -> changing_xact_state = false;
929+ }
930+ }
931+ else
932+ Assert (top_read_only_level > 0 &&
933+ entry -> xact_depth >= top_read_only_level );
874934
875935 /*
876936 * If we're in a subtransaction, stack up savepoints to match our level.
@@ -879,12 +939,21 @@ begin_remote_xact(ConnCacheEntry *entry)
879939 */
880940 while (entry -> xact_depth < curlevel )
881941 {
882- char sql [64 ];
942+ StringInfoData sql ;
943+ bool ro = (entry -> xact_depth + 1 == top_read_only_level );
883944
884- snprintf (sql , sizeof (sql ), "SAVEPOINT s%d" , entry -> xact_depth + 1 );
945+ initStringInfo (& sql );
946+ appendStringInfo (& sql , "SAVEPOINT s%d" , entry -> xact_depth + 1 );
947+ if (ro )
948+ appendStringInfoString (& sql , "; SET transaction_read_only = on" );
885949 entry -> changing_xact_state = true;
886- do_sql_command (entry -> conn , sql );
950+ do_sql_command (entry -> conn , sql . data );
887951 entry -> xact_depth ++ ;
952+ if (ro )
953+ {
954+ Assert (!entry -> xact_read_only );
955+ entry -> xact_read_only = true;
956+ }
888957 entry -> changing_xact_state = false;
889958 }
890959}
@@ -1189,6 +1258,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
11891258
11901259 /* Also reset cursor numbering for next transaction */
11911260 cursor_number = 0 ;
1261+
1262+ /* Likewise for top_read_only_level */
1263+ top_read_only_level = 0 ;
11921264}
11931265
11941266/*
@@ -1287,6 +1359,10 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
12871359 false);
12881360 }
12891361 }
1362+
1363+ /* If in the topmost read-only transaction, reset top_read_only_level */
1364+ if (curlevel == top_read_only_level )
1365+ top_read_only_level = 0 ;
12901366}
12911367
12921368/*
@@ -1389,6 +1465,9 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
13891465 /* Reset state to show we're out of a transaction */
13901466 entry -> xact_depth = 0 ;
13911467
1468+ /* Reset xact r/o state */
1469+ entry -> xact_read_only = false;
1470+
13921471 /*
13931472 * If the connection isn't in a good idle state, it is marked as
13941473 * invalid or keep_connections option of its server is disabled, then
@@ -1409,6 +1488,10 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
14091488 {
14101489 /* Reset state to show we're out of a subtransaction */
14111490 entry -> xact_depth -- ;
1491+
1492+ /* If in the topmost read-only transaction, reset xact r/o state */
1493+ if (entry -> xact_depth + 1 == top_read_only_level )
1494+ entry -> xact_read_only = false;
14121495 }
14131496}
14141497
0 commit comments