1212
1313typedef struct
1414{
15- BgwPool * pool ;
15+ BgwPoolConstructor constructor ;
1616 int id ;
17- } BgwExecutorCtx ;
17+ } BgwPoolExecutorCtx ;
1818
1919static void BgwPoolMainLoop (Datum arg )
2020{
21- BgwExecutorCtx * ctx = (BgwExecutorCtx * )arg ;
21+ BgwPoolExecutorCtx * ctx = (BgwPoolExecutorCtx * )arg ;
2222 int id = ctx -> id ;
23- BgwPool * pool = ctx -> pool ;
23+ BgwPool * pool = ctx -> constructor () ;
2424 int size ;
2525 void * work ;
2626
27+ BackgroundWorkerUnblockSignals ();
2728 BackgroundWorkerInitializeConnection (pool -> dbname , NULL );
2829
30+ elog (WARNING , "Start background worker %d" , id );
31+
2932 while (true) {
3033 PGSemaphoreLock (& pool -> available );
3134 SpinLockAcquire (& pool -> lock );
@@ -52,11 +55,9 @@ static void BgwPoolMainLoop(Datum arg)
5255 }
5356}
5457
55- BgwPool * BgwPoolCreate ( BgwExecutor executor , char const * dbname , size_t queueSize , int nWorkers )
58+ void BgwPoolInit ( BgwPool * pool , BgwPoolExecutor executor , char const * dbname , size_t queueSize )
5659{
57- int i ;
58- BackgroundWorker worker ;
59- BgwPool * pool = (BgwPool * )ShmemAlloc (queueSize + sizeof (BgwPool ));
60+ pool -> queue = (char * )ShmemAlloc (queueSize );
6061 pool -> executor = executor ;
6162 PGSemaphoreCreate (& pool -> available );
6263 PGSemaphoreCreate (& pool -> overflow );
@@ -68,22 +69,27 @@ BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSiz
6869 pool -> tail = 0 ;
6970 pool -> size = queueSize ;
7071 strcpy (pool -> dbname , dbname );
72+ }
73+
74+ void BgwPoolStart (int nWorkers , BgwPoolConstructor constructor )
75+ {
76+ int i ;
77+ BackgroundWorker worker ;
7178
7279 MemSet (& worker , 0 , sizeof (BackgroundWorker ));
7380 worker .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION ;
7481 worker .bgw_start_time = BgWorkerStart_ConsistentState ;
7582 worker .bgw_main = BgwPoolMainLoop ;
7683 worker .bgw_restart_time = 10 ; /* Wait 10 seconds for restart before crash */
77-
84+
7885 for (i = 0 ; i < nWorkers ; i ++ ) {
79- BgwExecutorCtx * ctx = (BgwExecutorCtx * )malloc (sizeof (BgwExecutorCtx ));
86+ BgwPoolExecutorCtx * ctx = (BgwPoolExecutorCtx * )malloc (sizeof (BgwPoolExecutorCtx ));
8087 snprintf (worker .bgw_name , BGW_MAXLEN , "bgw_pool_worker_%d" , i + 1 );
8188 ctx -> id = i ;
82- ctx -> pool = pool ;
89+ ctx -> constructor = constructor ;
8390 worker .bgw_main_arg = (Datum )ctx ;
8491 RegisterBackgroundWorker (& worker );
8592 }
86- return pool ;
8793}
8894
8995void BgwPoolExecute (BgwPool * pool , void * work , size_t size )
0 commit comments