@@ -218,6 +218,7 @@ typedef struct QueueBackendStatus
218218{
219219 int32 pid ; /* either a PID or InvalidPid */
220220 Oid dboid ; /* backend's database OID, or InvalidOid */
221+ BackendId nextListener ; /* id of next listener, or InvalidBackendId */
221222 QueuePosition pos ; /* backend has read queue up to here */
222223} QueueBackendStatus ;
223224
@@ -241,12 +242,19 @@ typedef struct QueueBackendStatus
241242 * Each backend uses the backend[] array entry with index equal to its
242243 * BackendId (which can range from 1 to MaxBackends). We rely on this to make
243244 * SendProcSignal fast.
245+ *
246+ * The backend[] array entries for actively-listening backends are threaded
247+ * together using firstListener and the nextListener links, so that we can
248+ * scan them without having to iterate over inactive entries. We keep this
249+ * list in order by BackendId so that the scan is cache-friendly when there
250+ * are many active entries.
244251 */
245252typedef struct AsyncQueueControl
246253{
247254 QueuePosition head ; /* head points to the next free location */
248255 QueuePosition tail ; /* the global tail is equivalent to the pos of
249256 * the "slowest" backend */
257+ BackendId firstListener ; /* id of first listener, or InvalidBackendId */
250258 TimestampTz lastQueueFillWarn ; /* time of last queue-full msg */
251259 QueueBackendStatus backend [FLEXIBLE_ARRAY_MEMBER ];
252260 /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -256,8 +264,10 @@ static AsyncQueueControl *asyncQueueControl;
256264
257265#define QUEUE_HEAD (asyncQueueControl->head)
258266#define QUEUE_TAIL (asyncQueueControl->tail)
267+ #define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
259268#define QUEUE_BACKEND_PID (i ) (asyncQueueControl->backend[i].pid)
260269#define QUEUE_BACKEND_DBOID (i ) (asyncQueueControl->backend[i].dboid)
270+ #define QUEUE_NEXT_LISTENER (i ) (asyncQueueControl->backend[i].nextListener)
261271#define QUEUE_BACKEND_POS (i ) (asyncQueueControl->backend[i].pos)
262272
263273/*
@@ -490,16 +500,16 @@ AsyncShmemInit(void)
490500 if (!found )
491501 {
492502 /* First time through, so initialize it */
493- int i ;
494-
495503 SET_QUEUE_POS (QUEUE_HEAD , 0 , 0 );
496504 SET_QUEUE_POS (QUEUE_TAIL , 0 , 0 );
505+ QUEUE_FIRST_LISTENER = InvalidBackendId ;
497506 asyncQueueControl -> lastQueueFillWarn = 0 ;
498507 /* zero'th entry won't be used, but let's initialize it anyway */
499- for (i = 0 ; i <= MaxBackends ; i ++ )
508+ for (int i = 0 ; i <= MaxBackends ; i ++ )
500509 {
501510 QUEUE_BACKEND_PID (i ) = InvalidPid ;
502511 QUEUE_BACKEND_DBOID (i ) = InvalidOid ;
512+ QUEUE_NEXT_LISTENER (i ) = InvalidBackendId ;
503513 SET_QUEUE_POS (QUEUE_BACKEND_POS (i ), 0 , 0 );
504514 }
505515 }
@@ -959,7 +969,7 @@ Exec_ListenPreCommit(void)
959969{
960970 QueuePosition head ;
961971 QueuePosition max ;
962- int i ;
972+ BackendId prevListener ;
963973
964974 /*
965975 * Nothing to do if we are already listening to something, nor if we
@@ -996,26 +1006,37 @@ Exec_ListenPreCommit(void)
9961006 * our database; any notifications it's already advanced over are surely
9971007 * committed and need not be re-examined by us. (We must consider only
9981008 * backends connected to our DB, because others will not have bothered to
999- * check committed-ness of notifications in our DB.) But we only bother
1000- * with that if there's more than a page worth of notifications
1001- * outstanding, otherwise scanning all the other backends isn't worth it.
1009+ * check committed-ness of notifications in our DB.)
10021010 *
1003- * We need exclusive lock here so we can look at other backends' entries.
1011+ * We need exclusive lock here so we can look at other backends' entries
1012+ * and manipulate the list links.
10041013 */
10051014 LWLockAcquire (AsyncQueueLock , LW_EXCLUSIVE );
10061015 head = QUEUE_HEAD ;
10071016 max = QUEUE_TAIL ;
1008- if (QUEUE_POS_PAGE (max ) != QUEUE_POS_PAGE (head ))
1017+ prevListener = InvalidBackendId ;
1018+ for (BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER (i ))
10091019 {
1010- for ( i = 1 ; i <= MaxBackends ; i ++ )
1011- {
1012- if ( QUEUE_BACKEND_DBOID ( i ) == MyDatabaseId )
1013- max = QUEUE_POS_MAX ( max , QUEUE_BACKEND_POS ( i ));
1014- }
1020+ if ( QUEUE_BACKEND_DBOID ( i ) == MyDatabaseId )
1021+ max = QUEUE_POS_MAX ( max , QUEUE_BACKEND_POS ( i ));
1022+ /* Also find last listening backend before this one */
1023+ if ( i < MyBackendId )
1024+ prevListener = i ;
10151025 }
10161026 QUEUE_BACKEND_POS (MyBackendId ) = max ;
10171027 QUEUE_BACKEND_PID (MyBackendId ) = MyProcPid ;
10181028 QUEUE_BACKEND_DBOID (MyBackendId ) = MyDatabaseId ;
1029+ /* Insert backend into list of listeners at correct position */
1030+ if (prevListener > 0 )
1031+ {
1032+ QUEUE_NEXT_LISTENER (MyBackendId ) = QUEUE_NEXT_LISTENER (prevListener );
1033+ QUEUE_NEXT_LISTENER (prevListener ) = MyBackendId ;
1034+ }
1035+ else
1036+ {
1037+ QUEUE_NEXT_LISTENER (MyBackendId ) = QUEUE_FIRST_LISTENER ;
1038+ QUEUE_FIRST_LISTENER = MyBackendId ;
1039+ }
10191040 LWLockRelease (AsyncQueueLock );
10201041
10211042 /* Now we are listed in the global array, so remember we're listening */
@@ -1228,13 +1249,31 @@ asyncQueueUnregister(void)
12281249 if (!amRegisteredListener ) /* nothing to do */
12291250 return ;
12301251
1231- LWLockAcquire (AsyncQueueLock , LW_SHARED );
1252+ /*
1253+ * Need exclusive lock here to manipulate list links.
1254+ */
1255+ LWLockAcquire (AsyncQueueLock , LW_EXCLUSIVE );
12321256 /* check if entry is valid and oldest ... */
12331257 advanceTail = (MyProcPid == QUEUE_BACKEND_PID (MyBackendId )) &&
12341258 QUEUE_POS_EQUAL (QUEUE_BACKEND_POS (MyBackendId ), QUEUE_TAIL );
12351259 /* ... then mark it invalid */
12361260 QUEUE_BACKEND_PID (MyBackendId ) = InvalidPid ;
12371261 QUEUE_BACKEND_DBOID (MyBackendId ) = InvalidOid ;
1262+ /* and remove it from the list */
1263+ if (QUEUE_FIRST_LISTENER == MyBackendId )
1264+ QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER (MyBackendId );
1265+ else
1266+ {
1267+ for (BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER (i ))
1268+ {
1269+ if (QUEUE_NEXT_LISTENER (i ) == MyBackendId )
1270+ {
1271+ QUEUE_NEXT_LISTENER (i ) = QUEUE_NEXT_LISTENER (MyBackendId );
1272+ break ;
1273+ }
1274+ }
1275+ }
1276+ QUEUE_NEXT_LISTENER (MyBackendId ) = InvalidBackendId ;
12381277 LWLockRelease (AsyncQueueLock );
12391278
12401279 /* mark ourselves as no longer listed in the global array */
@@ -1508,16 +1547,13 @@ asyncQueueFillWarning(void)
15081547 {
15091548 QueuePosition min = QUEUE_HEAD ;
15101549 int32 minPid = InvalidPid ;
1511- int i ;
15121550
1513- for (i = 1 ; i <= MaxBackends ; i ++ )
1551+ for (BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
15141552 {
1515- if (QUEUE_BACKEND_PID (i ) != InvalidPid )
1516- {
1517- min = QUEUE_POS_MIN (min , QUEUE_BACKEND_POS (i ));
1518- if (QUEUE_POS_EQUAL (min , QUEUE_BACKEND_POS (i )))
1519- minPid = QUEUE_BACKEND_PID (i );
1520- }
1553+ Assert (QUEUE_BACKEND_PID (i ) != InvalidPid );
1554+ min = QUEUE_POS_MIN (min , QUEUE_BACKEND_POS (i ));
1555+ if (QUEUE_POS_EQUAL (min , QUEUE_BACKEND_POS (i )))
1556+ minPid = QUEUE_BACKEND_PID (i );
15211557 }
15221558
15231559 ereport (WARNING ,
@@ -1553,7 +1589,6 @@ SignalBackends(void)
15531589 int32 * pids ;
15541590 BackendId * ids ;
15551591 int count ;
1556- int i ;
15571592 int32 pid ;
15581593
15591594 /*
@@ -1570,10 +1605,11 @@ SignalBackends(void)
15701605 count = 0 ;
15711606
15721607 LWLockAcquire (AsyncQueueLock , LW_EXCLUSIVE );
1573- for (i = 1 ; i <= MaxBackends ; i ++ )
1608+ for (BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
15741609 {
15751610 pid = QUEUE_BACKEND_PID (i );
1576- if (pid != InvalidPid && pid != MyProcPid )
1611+ Assert (pid != InvalidPid );
1612+ if (pid != MyProcPid )
15771613 {
15781614 QueuePosition pos = QUEUE_BACKEND_POS (i );
15791615
@@ -1588,7 +1624,7 @@ SignalBackends(void)
15881624 LWLockRelease (AsyncQueueLock );
15891625
15901626 /* Now send signals */
1591- for (i = 0 ; i < count ; i ++ )
1627+ for (int i = 0 ; i < count ; i ++ )
15921628 {
15931629 pid = pids [i ];
15941630
@@ -2064,17 +2100,16 @@ static void
20642100asyncQueueAdvanceTail (void )
20652101{
20662102 QueuePosition min ;
2067- int i ;
20682103 int oldtailpage ;
20692104 int newtailpage ;
20702105 int boundary ;
20712106
20722107 LWLockAcquire (AsyncQueueLock , LW_EXCLUSIVE );
20732108 min = QUEUE_HEAD ;
2074- for (i = 1 ; i <= MaxBackends ; i ++ )
2109+ for (BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
20752110 {
2076- if (QUEUE_BACKEND_PID (i ) != InvalidPid )
2077- min = QUEUE_POS_MIN (min , QUEUE_BACKEND_POS (i ));
2111+ Assert (QUEUE_BACKEND_PID (i ) != InvalidPid );
2112+ min = QUEUE_POS_MIN (min , QUEUE_BACKEND_POS (i ));
20782113 }
20792114 oldtailpage = QUEUE_POS_PAGE (QUEUE_TAIL );
20802115 QUEUE_TAIL = min ;
0 commit comments