2525#include "common/connect.h"
2626#include "funcapi.h"
2727#include "libpq-fe.h"
28+ #include "libpq/libpq-be-fe-helpers.h"
2829#include "mb/pg_wchar.h"
2930#include "miscadmin.h"
3031#include "pgstat.h"
@@ -113,8 +114,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
113114};
114115
115116/* Prototypes for private functions */
116- static PGresult * libpqrcv_PQexec (PGconn * streamConn , const char * query );
117- static PGresult * libpqrcv_PQgetResult (PGconn * streamConn );
118117static char * stringlist_to_identifierstr (PGconn * conn , List * strings );
119118
120119/*
@@ -148,7 +147,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
148147 bool must_use_password , const char * appname , char * * err )
149148{
150149 WalReceiverConn * conn ;
151- PostgresPollingStatusType status ;
152150 const char * keys [6 ];
153151 const char * vals [6 ];
154152 int i = 0 ;
@@ -214,56 +212,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
214212 Assert (i < lengthof (keys ));
215213
216214 conn = palloc0 (sizeof (WalReceiverConn ));
217- conn -> streamConn = PQconnectStartParams (keys , vals ,
218- /* expand_dbname = */ true);
219- if (PQstatus (conn -> streamConn ) == CONNECTION_BAD )
220- goto bad_connection_errmsg ;
221-
222- /*
223- * Poll connection until we have OK or FAILED status.
224- *
225- * Per spec for PQconnectPoll, first wait till socket is write-ready.
226- */
227- status = PGRES_POLLING_WRITING ;
228- do
229- {
230- int io_flag ;
231- int rc ;
232-
233- if (status == PGRES_POLLING_READING )
234- io_flag = WL_SOCKET_READABLE ;
235- #ifdef WIN32
236- /* Windows needs a different test while waiting for connection-made */
237- else if (PQstatus (conn -> streamConn ) == CONNECTION_STARTED )
238- io_flag = WL_SOCKET_CONNECTED ;
239- #endif
240- else
241- io_flag = WL_SOCKET_WRITEABLE ;
242-
243- rc = WaitLatchOrSocket (MyLatch ,
244- WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag ,
245- PQsocket (conn -> streamConn ),
246- 0 ,
247- WAIT_EVENT_LIBPQWALRECEIVER_CONNECT );
248-
249- /* Interrupted? */
250- if (rc & WL_LATCH_SET )
251- {
252- ResetLatch (MyLatch );
253- ProcessWalRcvInterrupts ();
254- }
255-
256- /* If socket is ready, advance the libpq state machine */
257- if (rc & io_flag )
258- status = PQconnectPoll (conn -> streamConn );
259- } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED );
215+ conn -> streamConn =
216+ libpqsrv_connect_params (keys , vals ,
217+ /* expand_dbname = */ true,
218+ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT );
260219
261220 if (PQstatus (conn -> streamConn ) != CONNECTION_OK )
262221 goto bad_connection_errmsg ;
263222
264223 if (must_use_password && !PQconnectionUsedPassword (conn -> streamConn ))
265224 {
266- PQfinish (conn -> streamConn );
225+ libpqsrv_disconnect (conn -> streamConn );
267226 pfree (conn );
268227
269228 ereport (ERROR ,
@@ -281,8 +240,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
281240 {
282241 PGresult * res ;
283242
284- res = libpqrcv_PQexec (conn -> streamConn ,
285- ALWAYS_SECURE_SEARCH_PATH_SQL );
243+ res = libpqsrv_exec (conn -> streamConn ,
244+ ALWAYS_SECURE_SEARCH_PATH_SQL ,
245+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
286246 if (PQresultStatus (res ) != PGRES_TUPLES_OK )
287247 {
288248 PQclear (res );
@@ -303,7 +263,7 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
303263
304264 /* error path, error already set */
305265bad_connection :
306- PQfinish (conn -> streamConn );
266+ libpqsrv_disconnect (conn -> streamConn );
307267 pfree (conn );
308268 return NULL ;
309269}
@@ -454,7 +414,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
454414 * Get the system identifier and timeline ID as a DataRow message from the
455415 * primary server.
456416 */
457- res = libpqrcv_PQexec (conn -> streamConn , "IDENTIFY_SYSTEM" );
417+ res = libpqsrv_exec (conn -> streamConn ,
418+ "IDENTIFY_SYSTEM" ,
419+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
458420 if (PQresultStatus (res ) != PGRES_TUPLES_OK )
459421 {
460422 PQclear (res );
@@ -631,7 +593,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
631593 options -> proto .physical .startpointTLI );
632594
633595 /* Start streaming. */
634- res = libpqrcv_PQexec (conn -> streamConn , cmd .data );
596+ res = libpqsrv_exec (conn -> streamConn ,
597+ cmd .data ,
598+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
635599 pfree (cmd .data );
636600
637601 if (PQresultStatus (res ) == PGRES_COMMAND_OK )
@@ -661,7 +625,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
661625 PGresult * res ;
662626
663627 /*
664- * Send copy-end message. As in libpqrcv_PQexec , this could theoretically
628+ * Send copy-end message. As in libpqsrv_exec , this could theoretically
665629 * block, but the risk seems small.
666630 */
667631 if (PQputCopyEnd (conn -> streamConn , NULL ) <= 0 ||
@@ -681,7 +645,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
681645 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
682646 * also possible in case we aborted the copy in mid-stream.
683647 */
684- res = libpqrcv_PQgetResult (conn -> streamConn );
648+ res = libpqsrv_get_result (conn -> streamConn ,
649+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
685650 if (PQresultStatus (res ) == PGRES_TUPLES_OK )
686651 {
687652 /*
@@ -696,7 +661,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
696661 PQclear (res );
697662
698663 /* the result set should be followed by CommandComplete */
699- res = libpqrcv_PQgetResult (conn -> streamConn );
664+ res = libpqsrv_get_result (conn -> streamConn ,
665+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
700666 }
701667 else if (PQresultStatus (res ) == PGRES_COPY_OUT )
702668 {
@@ -710,7 +676,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
710676 pchomp (PQerrorMessage (conn -> streamConn )))));
711677
712678 /* CommandComplete should follow */
713- res = libpqrcv_PQgetResult (conn -> streamConn );
679+ res = libpqsrv_get_result (conn -> streamConn ,
680+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
714681 }
715682
716683 if (PQresultStatus (res ) != PGRES_COMMAND_OK )
@@ -721,7 +688,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
721688 PQclear (res );
722689
723690 /* Verify that there are no more results */
724- res = libpqrcv_PQgetResult (conn -> streamConn );
691+ res = libpqsrv_get_result (conn -> streamConn ,
692+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
725693 if (res != NULL )
726694 ereport (ERROR ,
727695 (errcode (ERRCODE_PROTOCOL_VIOLATION ),
@@ -746,7 +714,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
746714 * Request the primary to send over the history file for given timeline.
747715 */
748716 snprintf (cmd , sizeof (cmd ), "TIMELINE_HISTORY %u" , tli );
749- res = libpqrcv_PQexec (conn -> streamConn , cmd );
717+ res = libpqsrv_exec (conn -> streamConn ,
718+ cmd ,
719+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
750720 if (PQresultStatus (res ) != PGRES_TUPLES_OK )
751721 {
752722 PQclear (res );
@@ -776,114 +746,13 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
776746 PQclear (res );
777747}
778748
779- /*
780- * Send a query and wait for the results by using the asynchronous libpq
781- * functions and socket readiness events.
782- *
783- * The function is modeled on libpqsrv_exec(), with the behavior difference
784- * being that it calls ProcessWalRcvInterrupts(). As an optimization, it
785- * skips try/catch, since all errors terminate the process.
786- *
787- * May return NULL, rather than an error result, on failure.
788- */
789- static PGresult *
790- libpqrcv_PQexec (PGconn * streamConn , const char * query )
791- {
792- PGresult * lastResult = NULL ;
793-
794- /*
795- * PQexec() silently discards any prior query results on the connection.
796- * This is not required for this function as it's expected that the caller
797- * (which is this library in all cases) will behave correctly and we don't
798- * have to be backwards compatible with old libpq.
799- */
800-
801- /*
802- * Submit the query. Since we don't use non-blocking mode, this could
803- * theoretically block. In practice, since we don't send very long query
804- * strings, the risk seems negligible.
805- */
806- if (!PQsendQuery (streamConn , query ))
807- return NULL ;
808-
809- for (;;)
810- {
811- /* Wait for, and collect, the next PGresult. */
812- PGresult * result ;
813-
814- result = libpqrcv_PQgetResult (streamConn );
815- if (result == NULL )
816- break ; /* query is complete, or failure */
817-
818- /*
819- * Emulate PQexec()'s behavior of returning the last result when there
820- * are many. We are fine with returning just last error message.
821- */
822- PQclear (lastResult );
823- lastResult = result ;
824-
825- if (PQresultStatus (lastResult ) == PGRES_COPY_IN ||
826- PQresultStatus (lastResult ) == PGRES_COPY_OUT ||
827- PQresultStatus (lastResult ) == PGRES_COPY_BOTH ||
828- PQstatus (streamConn ) == CONNECTION_BAD )
829- break ;
830- }
831-
832- return lastResult ;
833- }
834-
835- /*
836- * Perform the equivalent of PQgetResult(), but watch for interrupts.
837- */
838- static PGresult *
839- libpqrcv_PQgetResult (PGconn * streamConn )
840- {
841- /*
842- * Collect data until PQgetResult is ready to get the result without
843- * blocking.
844- */
845- while (PQisBusy (streamConn ))
846- {
847- int rc ;
848-
849- /*
850- * We don't need to break down the sleep into smaller increments,
851- * since we'll get interrupted by signals and can handle any
852- * interrupts here.
853- */
854- rc = WaitLatchOrSocket (MyLatch ,
855- WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
856- WL_LATCH_SET ,
857- PQsocket (streamConn ),
858- 0 ,
859- WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
860-
861- /* Interrupted? */
862- if (rc & WL_LATCH_SET )
863- {
864- ResetLatch (MyLatch );
865- ProcessWalRcvInterrupts ();
866- }
867-
868- /* Consume whatever data is available from the socket */
869- if (PQconsumeInput (streamConn ) == 0 )
870- {
871- /* trouble; return NULL */
872- return NULL ;
873- }
874- }
875-
876- /* Now we can collect and return the next PGresult */
877- return PQgetResult (streamConn );
878- }
879-
880749/*
881750 * Disconnect connection to primary, if any.
882751 */
883752static void
884753libpqrcv_disconnect (WalReceiverConn * conn )
885754{
886- PQfinish (conn -> streamConn );
755+ libpqsrv_disconnect (conn -> streamConn );
887756 PQfreemem (conn -> recvBuf );
888757 pfree (conn );
889758}
@@ -937,13 +806,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
937806 {
938807 PGresult * res ;
939808
940- res = libpqrcv_PQgetResult (conn -> streamConn );
809+ res = libpqsrv_get_result (conn -> streamConn ,
810+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
941811 if (PQresultStatus (res ) == PGRES_COMMAND_OK )
942812 {
943813 PQclear (res );
944814
945815 /* Verify that there are no more results. */
946- res = libpqrcv_PQgetResult (conn -> streamConn );
816+ res = libpqsrv_get_result (conn -> streamConn ,
817+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
947818 if (res != NULL )
948819 {
949820 PQclear (res );
@@ -1094,7 +965,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
1094965 appendStringInfoString (& cmd , " PHYSICAL RESERVE_WAL" );
1095966 }
1096967
1097- res = libpqrcv_PQexec (conn -> streamConn , cmd .data );
968+ res = libpqsrv_exec (conn -> streamConn ,
969+ cmd .data ,
970+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
1098971 pfree (cmd .data );
1099972
1100973 if (PQresultStatus (res ) != PGRES_TUPLES_OK )
@@ -1147,7 +1020,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
11471020
11481021 appendStringInfoString (& cmd , " );" );
11491022
1150- res = libpqrcv_PQexec (conn -> streamConn , cmd .data );
1023+ res = libpqsrv_exec (conn -> streamConn , cmd .data ,
1024+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
11511025 pfree (cmd .data );
11521026
11531027 if (PQresultStatus (res ) != PGRES_COMMAND_OK )
@@ -1214,7 +1088,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
12141088 {
12151089 char * cstrs [MaxTupleAttributeNumber ];
12161090
1217- ProcessWalRcvInterrupts ();
1091+ CHECK_FOR_INTERRUPTS ();
12181092
12191093 /* Do the allocations in temporary context. */
12201094 oldcontext = MemoryContextSwitchTo (rowcontext );
@@ -1260,7 +1134,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
12601134 (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
12611135 errmsg ("the query interface requires a database connection" )));
12621136
1263- pgres = libpqrcv_PQexec (conn -> streamConn , query );
1137+ pgres = libpqsrv_exec (conn -> streamConn ,
1138+ query ,
1139+ WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE );
12641140
12651141 switch (PQresultStatus (pgres ))
12661142 {
0 commit comments