1+ #include "postgres.h"
2+ #include "fmgr.h"
3+ #include "miscadmin.h"
4+ #include "postmaster/postmaster.h"
5+ #include "postmaster/bgworker.h"
6+ #include "storage/s_lock.h"
7+ #include "storage/spin.h"
8+ #include "storage/pg_sema.h"
9+ #include "storage/shmem.h"
10+
111#include "bgwpool.h"
212
313typedef struct
@@ -6,7 +16,7 @@ typedef struct
616 int id ;
717} BgwExecutorCtx ;
818
9- static void BgwMainLoop (Datum arg )
19+ static void BgwPoolMainLoop (Datum arg )
1020{
1121 BgwExecutorCtx * ctx = (BgwExecutorCtx * )arg ;
1222 int id = ctx -> id ;
@@ -19,34 +29,34 @@ static void BgwMainLoop(Datum arg)
1929 while (true) {
2030 PGSemaphoreLock (& pool -> available );
2131 SpinLockAcquire (& pool -> lock );
22- Assert ( pool -> head != pool -> tail ) ;
23- size = ( int * ) & pool -> queue [ pool -> head ] ;
24- void * work = palloc (len );
32+ size = * ( int * ) & pool -> queue [ pool -> head ] ;
33+ Assert ( size < pool -> size ) ;
34+ work = palloc (size );
2535 if (pool -> head + size + 4 > pool -> size ) {
2636 memcpy (work , pool -> queue , size );
27- pool -> head = (size & 3 ) & ~ 3 ;
37+ pool -> head = INTALIGN (size ) ;
2838 } else {
2939 memcpy (work , & pool -> queue [pool -> head + 4 ], size );
30- pool -> head += 4 + (( size & 3 ) & ~ 3 );
40+ pool -> head += 4 + INTALIGN ( size );
3141 }
3242 if (pool -> size == pool -> head ) {
3343 pool -> head = 0 ;
3444 }
3545 if (pool -> producerBlocked ) {
36- PGSemaphoreUnlock (& pool -> overflow );
3746 pool -> producerBlocked = false;
47+ PGSemaphoreUnlock (& pool -> overflow );
3848 }
3949 SpinLockRelease (& pool -> lock );
4050 pool -> executor (id , work , size );
4151 pfree (work );
4252 }
4353}
4454
45- BGWPool * BgwPoolCreate (BgwExecutor executor , char const * dbname , size_t queueSize , size_t nWorkers );
55+ BgwPool * BgwPoolCreate (BgwExecutor executor , char const * dbname , size_t queueSize , int nWorkers )
4656{
4757 int i ;
4858 BackgroundWorker worker ;
49- BGWPool * pool = (BGWPool * )ShmemAlloc (queueSize + sizeof (BGWPool ));
59+ BgwPool * pool = (BgwPool * )ShmemAlloc (queueSize + sizeof (BgwPool ));
5060 pool -> executor = executor ;
5161 PGSemaphoreCreate (& pool -> available );
5262 PGSemaphoreCreate (& pool -> overflow );
@@ -76,13 +86,13 @@ BGWPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSiz
7686 return pool ;
7787}
7888
79- void BgwPoolExecute (BgwPool * pool , void * work , size_t size );
89+ void BgwPoolExecute (BgwPool * pool , void * work , size_t size )
8090{
8191 Assert (size + 4 <= pool -> size );
8292
8393 SpinLockAcquire (& pool -> lock );
8494 while (true) {
85- if ((pool -> head < pool -> tail && pool -> size - pool -> tail < size + 4 && pool -> head < size )
95+ if ((pool -> head <= pool -> tail && pool -> size - pool -> tail < size + 4 && pool -> head < size )
8696 || (pool -> head > pool -> tail && pool -> head - pool -> tail < size + 4 ))
8797 {
8898 pool -> producerBlocked = true;
@@ -93,13 +103,18 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
93103 * (int * )& pool -> queue [pool -> tail ] = size ;
94104 if (pool -> size - pool -> tail >= size + 4 ) {
95105 memcpy (& pool -> queue [pool -> tail + 4 ], work , size );
96- pool -> tail += 4 + (size + 3 ) & ~ 3 ;
106+ pool -> tail += 4 + INTALIGN (size ) ;
97107 } else {
98108 memcpy (pool -> queue , work , size );
99- pool -> tail = (size + 3 ) & ~3 ;
109+ pool -> tail = INTALIGN (size );
110+ }
111+ if (pool -> tail == pool -> size ) {
112+ pool -> tail = 0 ;
100113 }
101114 PGSemaphoreUnlock (& pool -> available );
115+ break ;
102116 }
103117 }
118+ SpinLockRelease (& pool -> lock );
104119}
105120
0 commit comments