150150#include "utils/ps_status.h"
151151#include "utils/snapmgr.h"
152152#include "utils/timestamp.h"
153+ #include "port/atomics.h"
153154
154155
156+ /*
157+ * Async notification state machine states
158+ */
159+ typedef enum AsyncListenerState
160+ {
161+ ASYNC_STATE_IDLE = 0 , /* Backend is idle, waiting for signal */
162+ ASYNC_STATE_SIGNALLED = 1 , /* Backend has been signaled, will process soon */
163+ ASYNC_STATE_PROCESSING = 2 /* Backend is actively processing notifications */
164+ } AsyncListenerState ;
165+
155166/*
156167 * Maximum size of a NOTIFY payload, including terminating NULL. This
157168 * must be kept small enough so that a notification message fits on one
@@ -246,6 +257,7 @@ typedef struct QueueBackendStatus
246257 Oid dboid ; /* backend's database OID, or InvalidOid */
247258 ProcNumber nextListener ; /* id of next listener, or INVALID_PROC_NUMBER */
248259 QueuePosition pos ; /* backend has read queue up to here */
260+ pg_atomic_uint32 state ; /* async state machine state */
249261} QueueBackendStatus ;
250262
251263/*
@@ -301,6 +313,7 @@ static AsyncQueueControl *asyncQueueControl;
301313#define QUEUE_BACKEND_DBOID (i ) (asyncQueueControl->backend[i].dboid)
302314#define QUEUE_NEXT_LISTENER (i ) (asyncQueueControl->backend[i].nextListener)
303315#define QUEUE_BACKEND_POS (i ) (asyncQueueControl->backend[i].pos)
316+ #define QUEUE_BACKEND_STATE (i ) (asyncQueueControl->backend[i].state)
304317
305318/*
306319 * The SLRU buffer area through which we access the notification queue
@@ -405,12 +418,10 @@ static NotificationList *pendingNotifies = NULL;
405418
406419/*
407420 * Inbound notifications are initially processed by HandleNotifyInterrupt(),
408- * called from inside a signal handler. That just sets the
409- * notifyInterruptPending flag and sets the process
421+ * called from inside a signal handler. That just sets the process
410422 * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
411423 * actually deal with the interrupt.
412424 */
413- volatile sig_atomic_t notifyInterruptPending = false;
414425
415426/* True if we've registered an on_shmem_exit cleanup */
416427static bool unlistenExitRegistered = false;
@@ -527,6 +538,7 @@ AsyncShmemInit(void)
527538 QUEUE_BACKEND_DBOID (i ) = InvalidOid ;
528539 QUEUE_NEXT_LISTENER (i ) = INVALID_PROC_NUMBER ;
529540 SET_QUEUE_POS (QUEUE_BACKEND_POS (i ), 0 , 0 );
541+ pg_atomic_init_u32 (& QUEUE_BACKEND_STATE (i ), ASYNC_STATE_IDLE );
530542 }
531543 }
532544
@@ -1099,6 +1111,8 @@ Exec_ListenPreCommit(void)
10991111 QUEUE_BACKEND_POS (MyProcNumber ) = max ;
11001112 QUEUE_BACKEND_PID (MyProcNumber ) = MyProcPid ;
11011113 QUEUE_BACKEND_DBOID (MyProcNumber ) = MyDatabaseId ;
1114+ /* Initialize the atomic state to IDLE */
1115+ pg_atomic_write_u32 (& QUEUE_BACKEND_STATE (MyProcNumber ), ASYNC_STATE_IDLE );
11021116 /* Insert backend into list of listeners at correct position */
11031117 if (prevListener != INVALID_PROC_NUMBER )
11041118 {
@@ -1242,6 +1256,8 @@ asyncQueueUnregister(void)
12421256 /* Mark our entry as invalid */
12431257 QUEUE_BACKEND_PID (MyProcNumber ) = InvalidPid ;
12441258 QUEUE_BACKEND_DBOID (MyProcNumber ) = InvalidOid ;
1259+ /* Reset state to IDLE to prevent zombie listeners */
1260+ pg_atomic_write_u32 (& QUEUE_BACKEND_STATE (MyProcNumber ), ASYNC_STATE_IDLE );
12451261 /* and remove it from the list */
12461262 if (QUEUE_FIRST_LISTENER == MyProcNumber )
12471263 QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER (MyProcNumber );
@@ -1634,25 +1650,84 @@ SignalBackends(void)
16341650 for (int i = 0 ; i < count ; i ++ )
16351651 {
16361652 int32 pid = pids [i ];
1653+ ProcNumber procno = procnos [i ];
1654+ uint32 expected ;
1655+ bool signal_needed = false;
16371656
16381657 /*
1639- * If we are signaling our own process, no need to involve the kernel;
1640- * just set the flag directly.
1658+ * Implement state machine transitions for the notifier.
1659+ * We use a loop to handle race conditions where the state
1660+ * changes between our read and the CAS operation.
16411661 */
1642- if (pid == MyProcPid )
1662+ uint32 current_state = pg_atomic_read_membarrier_u32 (& QUEUE_BACKEND_STATE (procno ));
1663+
1664+ switch (current_state )
16431665 {
1644- notifyInterruptPending = true;
1645- continue ;
1666+ case ASYNC_STATE_IDLE :
1667+ /* Try to transition from IDLE to SIGNALLED */
1668+ expected = ASYNC_STATE_IDLE ;
1669+ if (pg_atomic_compare_exchange_u32 (& QUEUE_BACKEND_STATE (procno ),
1670+ & expected ,
1671+ ASYNC_STATE_SIGNALLED ))
1672+ {
1673+ /* Success - need to send signal */
1674+ signal_needed = true;
1675+ if (Trace_notify )
1676+ elog (DEBUG1 , "SignalBackends: transitioned backend %d from IDLE to SIGNALLED" , pid );
1677+ }
1678+ /* Another notifier already signaled - we're done */
1679+ break ;
1680+
1681+ case ASYNC_STATE_SIGNALLED :
1682+ /* Backend is already signaled - nothing to do */
1683+ if (Trace_notify )
1684+ elog (DEBUG1 , "SignalBackends: backend %d already in SIGNALLED state, skipping" , pid );
1685+ break ;
1686+
1687+ case ASYNC_STATE_PROCESSING :
1688+ /* Try to transition from PROCESSING to SIGNALLED */
1689+ expected = ASYNC_STATE_PROCESSING ;
1690+ if (pg_atomic_compare_exchange_u32 (& QUEUE_BACKEND_STATE (procno ),
1691+ & expected ,
1692+ ASYNC_STATE_SIGNALLED ))
1693+ {
1694+ /* Success - need to send signal for re-scan */
1695+ signal_needed = true;
1696+ if (Trace_notify )
1697+ elog (DEBUG1 , "SignalBackends: transitioned backend %d from PROCESSING to SIGNALLED for re-scan" , pid );
1698+ break ;
1699+ }
1700+ /* Another notifier already signaled - we're done */
1701+ break ;
1702+
1703+ default :
1704+ /* Should never happen */
1705+ elog (ERROR , "unexpected async state %u for backend %d" ,
1706+ current_state , pid );
16461707 }
16471708
1648- /*
1649- * Note: assuming things aren't broken, a signal failure here could
1650- * only occur if the target backend exited since we released
1651- * NotifyQueueLock; which is unlikely but certainly possible. So we
1652- * just log a low-level debug message if it happens.
1653- */
1654- if (SendProcSignal (pid , PROCSIG_NOTIFY_INTERRUPT , procnos [i ]) < 0 )
1655- elog (DEBUG3 , "could not signal backend with PID %d: %m" , pid );
1709+ /* Send signal if needed */
1710+ if (signal_needed )
1711+ {
1712+ /*
1713+ * For our own process, no need to involve the kernel
1714+ */
1715+ if (pid == MyProcPid )
1716+ {
1717+ SetLatch (MyLatch );
1718+ }
1719+ else
1720+ {
1721+ /*
1722+ * Note: assuming things aren't broken, a signal failure here could
1723+ * only occur if the target backend exited since we released
1724+ * NotifyQueueLock; which is unlikely but certainly possible. So we
1725+ * just log a low-level debug message if it happens.
1726+ */
1727+ if (SendProcSignal (pid , PROCSIG_NOTIFY_INTERRUPT , procno ) < 0 )
1728+ elog (DEBUG3 , "could not signal backend with PID %d: %m" , pid );
1729+ }
1730+ }
16561731 }
16571732
16581733 pfree (pids );
@@ -1805,20 +1880,43 @@ HandleNotifyInterrupt(void)
18051880{
18061881 /*
18071882 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1808- * you do here.
1883+ * you do here. The actual state transition has already been done by
1884+ * the notifier before sending the signal, so we only need to set the
1885+ * latch to ensure the backend wakes up and processes the notification.
18091886 */
18101887
1811- /* signal that work needs to be done */
1812- notifyInterruptPending = true;
1813-
18141888 /* make sure the event is processed in due course */
18151889 SetLatch (MyLatch );
18161890}
18171891
1892+ /*
1893+ * IsNotifyInterruptPending
1894+ *
1895+ * Check if there's a pending notify interrupt for this backend
1896+ */
1897+ bool
1898+ IsNotifyInterruptPending (void )
1899+ {
1900+ uint32 state ;
1901+
1902+ /* If not registered as a listener, no notifications are pending */
1903+ if (!amRegisteredListener )
1904+ return false;
1905+
1906+ /*
1907+ * Read the current state with a memory barrier to ensure we see
1908+ * the most recent value written by notifiers.
1909+ */
1910+ state = pg_atomic_read_membarrier_u32 (& QUEUE_BACKEND_STATE (MyProcNumber ));
1911+
1912+ /* Notification is pending if state is SIGNALLED */
1913+ return (state == ASYNC_STATE_SIGNALLED );
1914+ }
1915+
18181916/*
18191917 * ProcessNotifyInterrupt
18201918 *
1821- * This is called if we see notifyInterruptPending set , just before
1919+ * This is called if we see a notification interrupt is pending , just before
18221920 * transmitting ReadyForQuery at the end of a frontend command, and
18231921 * also if a notify signal occurs while reading from the frontend.
18241922 * HandleNotifyInterrupt() will cause the read to be interrupted
@@ -1837,7 +1935,7 @@ ProcessNotifyInterrupt(bool flush)
18371935 return ; /* not really idle */
18381936
18391937 /* Loop in case another signal arrives while sending messages */
1840- while (notifyInterruptPending )
1938+ while (IsNotifyInterruptPending () )
18411939 ProcessIncomingNotify (flush );
18421940}
18431941
@@ -2182,28 +2280,81 @@ asyncQueueAdvanceTail(void)
21822280static void
21832281ProcessIncomingNotify (bool flush )
21842282{
2185- /* We *must* reset the flag */
2186- notifyInterruptPending = false;
2283+ uint32 expected ;
21872284
2188- /* Do nothing else if we aren't actively listening */
2285+ /* Do nothing if we aren't actively listening */
21892286 if (listenChannels == NIL )
21902287 return ;
21912288
2289+ /*
2290+ * Perform state transition from SIGNALLED to PROCESSING.
2291+ * This is the "acquire lock" operation for the listener.
2292+ */
2293+ expected = ASYNC_STATE_SIGNALLED ;
2294+ if (!pg_atomic_compare_exchange_u32 (& QUEUE_BACKEND_STATE (MyProcNumber ),
2295+ & expected ,
2296+ ASYNC_STATE_PROCESSING ))
2297+ {
2298+ /*
2299+ * CAS failed - the state was not SIGNALLED. This should not happen
2300+ * as ProcessNotifyInterrupt only calls us when state is SIGNALLED.
2301+ */
2302+ elog (ERROR , "unexpected async state %u in ProcessIncomingNotify, expected SIGNALLED" ,
2303+ expected );
2304+ }
2305+
21922306 if (Trace_notify )
2193- elog (DEBUG1 , "ProcessIncomingNotify" );
2307+ elog (DEBUG1 , "ProcessIncomingNotify: transitioned to PROCESSING " );
21942308
21952309 set_ps_display ("notify interrupt" );
21962310
21972311 /*
2198- * We must run asyncQueueReadAllNotifications inside a transaction, else
2199- * bad things happen if it gets an error.
2200- */
2312+ * We must run asyncQueueReadAllNotifications inside a transaction, else
2313+ * bad things happen if it gets an error.
2314+ */
22012315 StartTransactionCommand ();
22022316
22032317 asyncQueueReadAllNotifications ();
22042318
22052319 CommitTransactionCommand ();
22062320
2321+ /*
2322+ * Try to transition from PROCESSING back to IDLE.
2323+ * This is the "release lock" operation for the listener.
2324+ */
2325+ expected = ASYNC_STATE_PROCESSING ;
2326+ if (pg_atomic_compare_exchange_u32 (& QUEUE_BACKEND_STATE (MyProcNumber ),
2327+ & expected ,
2328+ ASYNC_STATE_IDLE ))
2329+ {
2330+ /* Success - we're done, transitioned to IDLE */
2331+ if (Trace_notify )
2332+ elog (DEBUG1 , "ProcessIncomingNotify: transitioned to IDLE" );
2333+ }
2334+ else
2335+ {
2336+ /* CAS failed - check what the new state is */
2337+ if (expected == ASYNC_STATE_SIGNALLED )
2338+ {
2339+ /*
2340+ * A notifier set our state to SIGNALLED while we were processing.
2341+ * We are done with this batch of work, but we know there is more
2342+ * to do. Rather than loop here and risk starving other backend
2343+ * activity, we set our own latch to ensure we are woken up again
2344+ * to re-process, and then exit. The state is left as SIGNALLED.
2345+ */
2346+ if (Trace_notify )
2347+ elog (DEBUG1 , "ProcessIncomingNotify: signalled while processing" );
2348+ SetLatch (MyLatch );
2349+ }
2350+ else
2351+ {
2352+ /* Any other state is an error */
2353+ elog (ERROR , "unexpected async state %u when trying to return to IDLE" ,
2354+ expected );
2355+ }
2356+ }
2357+
22072358 /*
22082359 * If this isn't an end-of-command case, we must flush the notify messages
22092360 * to ensure frontend gets them promptly.
0 commit comments