133133#include "access/slru.h"
134134#include "access/transam.h"
135135#include "access/xact.h"
136+ #include "catalog/pg_collation.h"
136137#include "catalog/pg_database.h"
137138#include "commands/async.h"
138139#include "common/hashfn.h"
@@ -312,6 +313,12 @@ static SlruCtlData NotifyCtlData;
312313
313314#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
314315
316+ typedef struct
317+ {
318+ bool ispatt ;
319+ char channel [FLEXIBLE_ARRAY_MEMBER ]; /* nul-terminated string */
320+ } ListenChannel ;
321+
315322/*
316323 * listenChannels identifies the channels we are actually listening to
317324 * (ie, have committed a LISTEN on). It is a simple list of channel names,
@@ -339,6 +346,7 @@ typedef enum
339346typedef struct
340347{
341348 ListenActionKind action ;
349+ bool ispatt ;
342350 char channel [FLEXIBLE_ARRAY_MEMBER ]; /* nul-terminated string */
343351} ListenAction ;
344352
@@ -430,13 +438,13 @@ int max_notify_queue_pages = 1048576;
430438/* local function prototypes */
431439static inline int64 asyncQueuePageDiff (int64 p , int64 q );
432440static inline bool asyncQueuePagePrecedes (int64 p , int64 q );
433- static void queue_listen (ListenActionKind action , const char * channel );
441+ static void queue_listen (ListenActionKind action , const bool ispatt , const char * channel );
434442static void Async_UnlistenOnExit (int code , Datum arg );
435443static void Exec_ListenPreCommit (void );
436- static void Exec_ListenCommit (const char * channel );
437- static void Exec_UnlistenCommit (const char * channel );
444+ static void Exec_ListenCommit (const bool ispatt , const char * channel );
445+ static void Exec_UnlistenCommit (const bool ispatt , const char * channel );
438446static void Exec_UnlistenAllCommit (void );
439- static bool IsListeningOn (const char * channel );
447+ static bool IsListeningOn (const bool trymatch , const bool ispatt , const char * channel );
440448static void asyncQueueUnregister (void );
441449static bool asyncQueueIsFull (void );
442450static bool asyncQueueAdvance (volatile QueuePosition * position , int entryLength );
@@ -687,7 +695,7 @@ Async_Notify(const char *channel, const char *payload)
687695 * commit.
688696 */
689697static void
690- queue_listen (ListenActionKind action , const char * channel )
698+ queue_listen (ListenActionKind action , const bool ispatt , const char * channel )
691699{
692700 MemoryContext oldcontext ;
693701 ListenAction * actrec ;
@@ -705,6 +713,7 @@ queue_listen(ListenActionKind action, const char *channel)
705713 actrec = (ListenAction * ) palloc (offsetof(ListenAction , channel ) +
706714 strlen (channel ) + 1 );
707715 actrec -> action = action ;
716+ actrec -> ispatt = ispatt ;
708717 strcpy (actrec -> channel , channel );
709718
710719 if (pendingActions == NULL || my_level > pendingActions -> nestingLevel )
@@ -735,12 +744,12 @@ queue_listen(ListenActionKind action, const char *channel)
735744 * This is executed by the SQL listen command.
736745 */
737746void
738- Async_Listen (const char * channel )
747+ Async_Listen (const bool ispatt , const char * channel )
739748{
740749 if (Trace_notify )
741750 elog (DEBUG1 , "Async_Listen(%s,%d)" , channel , MyProcPid );
742751
743- queue_listen (LISTEN_LISTEN , channel );
752+ queue_listen (LISTEN_LISTEN , ispatt , channel );
744753}
745754
746755/*
@@ -749,7 +758,7 @@ Async_Listen(const char *channel)
749758 * This is executed by the SQL unlisten command.
750759 */
751760void
752- Async_Unlisten (const char * channel )
761+ Async_Unlisten (const bool ispatt , const char * channel )
753762{
754763 if (Trace_notify )
755764 elog (DEBUG1 , "Async_Unlisten(%s,%d)" , channel , MyProcPid );
@@ -758,7 +767,7 @@ Async_Unlisten(const char *channel)
758767 if (pendingActions == NULL && !unlistenExitRegistered )
759768 return ;
760769
761- queue_listen (LISTEN_UNLISTEN , channel );
770+ queue_listen (LISTEN_UNLISTEN , ispatt , channel );
762771}
763772
764773/*
@@ -776,7 +785,7 @@ Async_UnlistenAll(void)
776785 if (pendingActions == NULL && !unlistenExitRegistered )
777786 return ;
778787
779- queue_listen (LISTEN_UNLISTEN_ALL , "" );
788+ queue_listen (LISTEN_UNLISTEN_ALL , false, "" );
780789}
781790
782791/*
@@ -803,10 +812,31 @@ pg_listening_channels(PG_FUNCTION_ARGS)
803812
804813 if (funcctx -> call_cntr < list_length (listenChannels ))
805814 {
806- char * channel = (char * ) list_nth (listenChannels ,
807- funcctx -> call_cntr );
815+ ListenChannel * chnl ;
816+
817+ chnl = (ListenChannel * )list_nth (listenChannels , funcctx -> call_cntr );
818+
819+ if (chnl -> ispatt )
820+ {
821+ Size plen ;
822+ char * result ;
823+ MemoryContext oldcontext ;
824+
825+ oldcontext = MemoryContextSwitchTo (funcctx -> multi_call_memory_ctx );
826+
827+ plen = strlen (chnl -> channel );
828+ result = (char * )palloc (plen + 3 );
829+ result [0 ] = '\'' ;
830+ memcpy (result + 1 , chnl -> channel , plen );
831+ result [plen + 1 ] = '\'' ;
832+ result [plen + 2 ] = '\0' ;
808833
809- SRF_RETURN_NEXT (funcctx , CStringGetTextDatum (channel ));
834+ MemoryContextSwitchTo (oldcontext );
835+
836+ SRF_RETURN_NEXT (funcctx , CStringGetTextDatum (result ));
837+ }
838+ else
839+ SRF_RETURN_NEXT (funcctx , CStringGetTextDatum (chnl -> channel ));
810840 }
811841
812842 SRF_RETURN_DONE (funcctx );
@@ -989,10 +1019,10 @@ AtCommit_Notify(void)
9891019 switch (actrec -> action )
9901020 {
9911021 case LISTEN_LISTEN :
992- Exec_ListenCommit (actrec -> channel );
1022+ Exec_ListenCommit (actrec -> ispatt , actrec -> channel );
9931023 break ;
9941024 case LISTEN_UNLISTEN :
995- Exec_UnlistenCommit (actrec -> channel );
1025+ Exec_UnlistenCommit (actrec -> ispatt , actrec -> channel );
9961026 break ;
9971027 case LISTEN_UNLISTEN_ALL :
9981028 Exec_UnlistenAllCommit ();
@@ -1133,12 +1163,13 @@ Exec_ListenPreCommit(void)
11331163 * Add the channel to the list of channels we are listening on.
11341164 */
11351165static void
1136- Exec_ListenCommit (const char * channel )
1166+ Exec_ListenCommit (const bool ispatt , const char * channel )
11371167{
1138- MemoryContext oldcontext ;
1168+ MemoryContext oldcontext ;
1169+ ListenChannel * chnl ;
11391170
11401171 /* Do nothing if we are already listening on this channel */
1141- if (IsListeningOn (channel ))
1172+ if (IsListeningOn (false, ispatt , channel ))
11421173 return ;
11431174
11441175 /*
@@ -1150,7 +1181,15 @@ Exec_ListenCommit(const char *channel)
11501181 * later.
11511182 */
11521183 oldcontext = MemoryContextSwitchTo (TopMemoryContext );
1153- listenChannels = lappend (listenChannels , pstrdup (channel ));
1184+
1185+ chnl = (ListenChannel * ) palloc (offsetof(ListenChannel , channel ) +
1186+ strlen (channel ) + 1 );
1187+
1188+ chnl -> ispatt = ispatt ;
1189+ strcpy (chnl -> channel , channel );
1190+
1191+ listenChannels = lappend (listenChannels , chnl );
1192+
11541193 MemoryContextSwitchTo (oldcontext );
11551194}
11561195
@@ -1160,7 +1199,7 @@ Exec_ListenCommit(const char *channel)
11601199 * Remove the specified channel name from listenChannels.
11611200 */
11621201static void
1163- Exec_UnlistenCommit (const char * channel )
1202+ Exec_UnlistenCommit (const bool ispatt , const char * channel )
11641203{
11651204 ListCell * q ;
11661205
@@ -1169,9 +1208,12 @@ Exec_UnlistenCommit(const char *channel)
11691208
11701209 foreach (q , listenChannels )
11711210 {
1172- char * lchan = (char * ) lfirst (q );
1211+ ListenChannel * lchan = (ListenChannel * ) lfirst (q );
1212+
1213+ if (lchan -> ispatt != ispatt )
1214+ continue ;
11731215
1174- if (strcmp (lchan , channel ) == 0 )
1216+ if (strcmp (lchan -> channel , channel ) == 0 )
11751217 {
11761218 listenChannels = foreach_delete_current (listenChannels , q );
11771219 pfree (lchan );
@@ -1209,16 +1251,37 @@ Exec_UnlistenAllCommit(void)
12091251 * fairly short, though.
12101252 */
12111253static bool
1212- IsListeningOn (const char * channel )
1254+ IsListeningOn (const bool trymatch , const bool ispatt , const char * channel )
12131255{
12141256 ListCell * p ;
12151257
12161258 foreach (p , listenChannels )
12171259 {
1218- char * lchan = (char * ) lfirst (p );
1260+ ListenChannel * lchan = (ListenChannel * ) lfirst (p );
12191261
1220- if (strcmp (lchan , channel ) == 0 )
1221- return true;
1262+ if (trymatch )
1263+ {
1264+ Assert (!ispatt );
1265+
1266+ if (lchan -> ispatt )
1267+ {
1268+ Datum s = PointerGetDatum (cstring_to_text (channel ));
1269+ Datum p = PointerGetDatum (cstring_to_text (lchan -> channel ));
1270+
1271+ if (DatumGetBool (DirectFunctionCall2Coll (textlike , DEFAULT_COLLATION_OID , s , p )))
1272+ return true;
1273+ }
1274+ else if (strcmp (lchan -> channel , channel ) == 0 )
1275+ return true;
1276+ }
1277+ else
1278+ {
1279+ if (ispatt == lchan -> ispatt )
1280+ {
1281+ if (strcmp (lchan -> channel , channel ) == 0 )
1282+ return true;
1283+ }
1284+ }
12221285 }
12231286 return false;
12241287}
@@ -2071,7 +2134,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
20712134 /* qe->data is the null-terminated channel name */
20722135 char * channel = qe -> data ;
20732136
2074- if (IsListeningOn (channel ))
2137+ if (IsListeningOn (true, false, channel ))
20752138 {
20762139 /* payload follows channel name */
20772140 char * payload = qe -> data + strlen (channel ) + 1 ;
0 commit comments