3535 * the required action (dump or restore) and returns a malloc'd status string.
3636 * The status string is passed back to the master where it is interpreted by
3737 * AH->MasterEndParallelItemPtr, another format-specific routine. That
38- * function can update state or catalog information on the master's side,
38+ * function can update format-specific information on the master's side,
3939 * depending on the reply from the worker process. In the end it returns a
40- * status code, which is 0 for successful execution.
40+ * status code, which we pass to the ParallelCompletionPtr callback function
41+ * that was passed to DispatchJobForTocEntry(). The callback function does
42+ * state updating for the master control logic in pg_backup_archiver.c.
4143 *
4244 * Remember that we have forked off the workers only after we have read in
4345 * the catalog. That's why our worker processes can also access the catalog
4850 * In the master process, the workerStatus field for each worker has one of
4951 * the following values:
5052 * WRKR_IDLE: it's waiting for a command
51- * WRKR_WORKING: it's been sent a command
52- * WRKR_FINISHED: it's returned a result
53+ * WRKR_WORKING: it's working on a command
5354 * WRKR_TERMINATED: process ended
54- * The FINISHED state indicates that the worker is idle, but we've not yet
55- * dealt with the status code it returned from the prior command.
56- * ReapWorkerStatus() extracts the unhandled command status value and sets
57- * the workerStatus back to WRKR_IDLE.
5855 */
5956
6057#include "postgres_fe.h"
7976#define PIPE_READ 0
8077#define PIPE_WRITE 1
8178
79+ #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
80+
8281#ifdef WIN32
8382
8483/*
@@ -175,9 +174,12 @@ static void setup_cancel_handler(void);
175174static void set_cancel_pstate (ParallelState * pstate );
176175static void set_cancel_slot_archive (ParallelSlot * slot , ArchiveHandle * AH );
177176static void RunWorker (ArchiveHandle * AH , ParallelSlot * slot );
177+ static int GetIdleWorker (ParallelState * pstate );
178178static bool HasEveryWorkerTerminated (ParallelState * pstate );
179179static void lockTableForWorker (ArchiveHandle * AH , TocEntry * te );
180180static void WaitForCommands (ArchiveHandle * AH , int pipefd [2 ]);
181+ static bool ListenToWorkers (ArchiveHandle * AH , ParallelState * pstate ,
182+ bool do_wait );
181183static char * getMessageFromMaster (int pipefd [2 ]);
182184static void sendMessageToMaster (int pipefd [2 ], const char * str );
183185static int select_loop (int maxFd , fd_set * workerset );
@@ -349,8 +351,8 @@ archive_close_connection(int code, void *arg)
349351 * fail to detect it because there would be no EOF condition on
350352 * the other end of the pipe.)
351353 */
352- if (slot -> args -> AH )
353- DisconnectDatabase (& (slot -> args -> AH -> public ));
354+ if (slot -> AH )
355+ DisconnectDatabase (& (slot -> AH -> public ));
354356
355357#ifdef WIN32
356358 closesocket (slot -> pipeRevRead );
@@ -407,7 +409,7 @@ ShutdownWorkersHard(ParallelState *pstate)
407409 EnterCriticalSection (& signal_info_lock );
408410 for (i = 0 ; i < pstate -> numWorkers ; i ++ )
409411 {
410- ArchiveHandle * AH = pstate -> parallelSlot [i ].args -> AH ;
412+ ArchiveHandle * AH = pstate -> parallelSlot [i ].AH ;
411413 char errbuf [1 ];
412414
413415 if (AH != NULL && AH -> connCancel != NULL )
@@ -634,7 +636,7 @@ consoleHandler(DWORD dwCtrlType)
634636 for (i = 0 ; i < signal_info .pstate -> numWorkers ; i ++ )
635637 {
636638 ParallelSlot * slot = & (signal_info .pstate -> parallelSlot [i ]);
637- ArchiveHandle * AH = slot -> args -> AH ;
639+ ArchiveHandle * AH = slot -> AH ;
638640 HANDLE hThread = (HANDLE ) slot -> hThread ;
639641
640642 /*
@@ -789,7 +791,7 @@ set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
789791 EnterCriticalSection (& signal_info_lock );
790792#endif
791793
792- slot -> args -> AH = AH ;
794+ slot -> AH = AH ;
793795
794796#ifdef WIN32
795797 LeaveCriticalSection (& signal_info_lock );
@@ -935,9 +937,10 @@ ParallelBackupStart(ArchiveHandle *AH)
935937 strerror (errno ));
936938
937939 slot -> workerStatus = WRKR_IDLE ;
938- slot -> args = (ParallelArgs * ) pg_malloc (sizeof (ParallelArgs ));
939- slot -> args -> AH = NULL ;
940- slot -> args -> te = NULL ;
940+ slot -> AH = NULL ;
941+ slot -> te = NULL ;
942+ slot -> callback = NULL ;
943+ slot -> callback_data = NULL ;
941944
942945 /* master's ends of the pipes */
943946 slot -> pipeRead = pipeWM [PIPE_READ ];
@@ -1071,20 +1074,28 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
10711074}
10721075
10731076/*
1074- * Dispatch a job to some free worker (caller must ensure there is one!)
1077+ * Dispatch a job to some free worker.
10751078 *
10761079 * te is the TocEntry to be processed, act is the action to be taken on it.
1080+ * callback is the function to call on completion of the job.
1081+ *
1082+ * If no worker is currently available, this will block, and previously
1083+ * registered callback functions may be called.
10771084 */
10781085void
1079- DispatchJobForTocEntry (ArchiveHandle * AH , ParallelState * pstate , TocEntry * te ,
1080- T_Action act )
1086+ DispatchJobForTocEntry (ArchiveHandle * AH ,
1087+ ParallelState * pstate ,
1088+ TocEntry * te ,
1089+ T_Action act ,
1090+ ParallelCompletionPtr callback ,
1091+ void * callback_data )
10811092{
10821093 int worker ;
10831094 char * arg ;
10841095
1085- /* our caller makes sure that at least one worker is idle */
1086- worker = GetIdleWorker (pstate );
1087- Assert ( worker != NO_SLOT );
1096+ /* Get a worker, waiting if none are idle */
1097+ while (( worker = GetIdleWorker (pstate )) == NO_SLOT )
1098+ WaitForWorkers ( AH , pstate , WFW_ONE_IDLE );
10881099
10891100 /* Construct and send command string */
10901101 arg = (AH -> MasterStartParallelItemPtr ) (AH , te , act );
@@ -1095,14 +1106,16 @@ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
10951106
10961107 /* Remember worker is busy, and which TocEntry it's working on */
10971108 pstate -> parallelSlot [worker ].workerStatus = WRKR_WORKING ;
1098- pstate -> parallelSlot [worker ].args -> te = te ;
1109+ pstate -> parallelSlot [worker ].te = te ;
1110+ pstate -> parallelSlot [worker ].callback = callback ;
1111+ pstate -> parallelSlot [worker ].callback_data = callback_data ;
10991112}
11001113
11011114/*
11021115 * Find an idle worker and return its slot number.
11031116 * Return NO_SLOT if none are idle.
11041117 */
1105- int
1118+ static int
11061119GetIdleWorker (ParallelState * pstate )
11071120{
11081121 int i ;
@@ -1274,17 +1287,16 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
12741287 * immediately if there is none available.
12751288 *
12761289 * When we get a status message, we let MasterEndParallelItemPtr process it,
1277- * then save the resulting status code and switch the worker's state to
1278- * WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify
1279- * that the status was "OK" and push the worker back to IDLE state.
1290+ * then pass the resulting status code to the callback function that was
1291+ * specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
12801292 *
1281- * XXX Rube Goldberg would be proud of this API, but no one else should be .
1293+ * Returns true if we collected a status message, else false .
12821294 *
12831295 * XXX is it worth checking for more than one status message per call?
12841296 * It seems somewhat unlikely that multiple workers would finish at exactly
12851297 * the same time.
12861298 */
1287- void
1299+ static bool
12881300ListenToWorkers (ArchiveHandle * AH , ParallelState * pstate , bool do_wait )
12891301{
12901302 int worker ;
@@ -1298,34 +1310,39 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
12981310 /* If do_wait is true, we must have detected EOF on some socket */
12991311 if (do_wait )
13001312 exit_horribly (modulename , "a worker process died unexpectedly\n" );
1301- return ;
1313+ return false ;
13021314 }
13031315
13041316 /* Process it and update our idea of the worker's status */
13051317 if (messageStartsWith (msg , "OK " ))
13061318 {
1307- TocEntry * te = pstate -> parallelSlot [worker ].args -> te ;
1319+ ParallelSlot * slot = & pstate -> parallelSlot [worker ];
1320+ TocEntry * te = slot -> te ;
13081321 char * statusString ;
1322+ int status ;
13091323
13101324 if (messageStartsWith (msg , "OK RESTORE " ))
13111325 {
13121326 statusString = msg + strlen ("OK RESTORE " );
1313- pstate -> parallelSlot [ worker ]. status =
1327+ status =
13141328 (AH -> MasterEndParallelItemPtr )
13151329 (AH , te , statusString , ACT_RESTORE );
1330+ slot -> callback (AH , te , status , slot -> callback_data );
13161331 }
13171332 else if (messageStartsWith (msg , "OK DUMP " ))
13181333 {
13191334 statusString = msg + strlen ("OK DUMP " );
1320- pstate -> parallelSlot [ worker ]. status =
1335+ status =
13211336 (AH -> MasterEndParallelItemPtr )
13221337 (AH , te , statusString , ACT_DUMP );
1338+ slot -> callback (AH , te , status , slot -> callback_data );
13231339 }
13241340 else
13251341 exit_horribly (modulename ,
13261342 "invalid message received from worker: \"%s\"\n" ,
13271343 msg );
1328- pstate -> parallelSlot [worker ].workerStatus = WRKR_FINISHED ;
1344+ slot -> workerStatus = WRKR_IDLE ;
1345+ slot -> te = NULL ;
13291346 }
13301347 else
13311348 exit_horribly (modulename ,
@@ -1334,110 +1351,79 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
13341351
13351352 /* Free the string returned from getMessageFromWorker */
13361353 free (msg );
1337- }
1338-
1339- /*
1340- * Check to see if any worker is in WRKR_FINISHED state. If so,
1341- * return its command status code into *status, reset it to IDLE state,
1342- * and return its slot number. Otherwise return NO_SLOT.
1343- *
1344- * This function is executed in the master process.
1345- */
1346- int
1347- ReapWorkerStatus (ParallelState * pstate , int * status )
1348- {
1349- int i ;
13501354
1351- for (i = 0 ; i < pstate -> numWorkers ; i ++ )
1352- {
1353- if (pstate -> parallelSlot [i ].workerStatus == WRKR_FINISHED )
1354- {
1355- * status = pstate -> parallelSlot [i ].status ;
1356- pstate -> parallelSlot [i ].status = 0 ;
1357- pstate -> parallelSlot [i ].workerStatus = WRKR_IDLE ;
1358- return i ;
1359- }
1360- }
1361- return NO_SLOT ;
1355+ return true;
13621356}
13631357
13641358/*
1365- * Wait, if necessary, until we have at least one idle worker.
1366- * Reap worker status as necessary to move FINISHED workers to IDLE state.
1359+ * Check for status results from workers, waiting if necessary.
13671360 *
1368- * We assume that no extra processing is required when reaping a finished
1369- * command, except for checking that the status was OK (zero).
1370- * Caution: that assumption means that this function can only be used in
1371- * parallel dump, not parallel restore, because the latter has a more
1372- * complex set of rules about handling status.
1361+ * Available wait modes are:
1362+ * WFW_NO_WAIT: reap any available status, but don't block
1363+ * WFW_GOT_STATUS: wait for at least one more worker to finish
1364+ * WFW_ONE_IDLE: wait for at least one worker to be idle
1365+ * WFW_ALL_IDLE: wait for all workers to be idle
1366+ *
1367+ * Any received results are passed to MasterEndParallelItemPtr and then
1368+ * to the callback specified to DispatchJobForTocEntry.
13731369 *
13741370 * This function is executed in the master process.
13751371 */
13761372void
1377- EnsureIdleWorker (ArchiveHandle * AH , ParallelState * pstate )
1373+ WaitForWorkers (ArchiveHandle * AH , ParallelState * pstate , WFW_WaitOption mode )
13781374{
1379- int ret_worker ;
1380- int work_status ;
1375+ bool do_wait = false;
13811376
1382- for (;;)
1377+ /*
1378+ * In GOT_STATUS mode, always block waiting for a message, since we can't
1379+ * return till we get something. In other modes, we don't block the first
1380+ * time through the loop.
1381+ */
1382+ if (mode == WFW_GOT_STATUS )
13831383 {
1384- int nTerm = 0 ;
1385-
1386- while ((ret_worker = ReapWorkerStatus (pstate , & work_status )) != NO_SLOT )
1387- {
1388- if (work_status != 0 )
1389- exit_horribly (modulename , "error processing a parallel work item\n" );
1390-
1391- nTerm ++ ;
1392- }
1393-
1394- /*
1395- * We need to make sure that we have an idle worker before dispatching
1396- * the next item. If nTerm > 0 we already have that (quick check).
1397- */
1398- if (nTerm > 0 )
1399- return ;
1400-
1401- /* explicit check for an idle worker */
1402- if (GetIdleWorker (pstate ) != NO_SLOT )
1403- return ;
1384+ /* Assert that caller knows what it's doing */
1385+ Assert (!IsEveryWorkerIdle (pstate ));
1386+ do_wait = true;
1387+ }
14041388
1389+ for (;;)
1390+ {
14051391 /*
1406- * If we have no idle worker, read the result of one or more workers
1407- * and loop the loop to call ReapWorkerStatus() on them
1392+ * Check for status messages, even if we don't need to block. We do
1393+ * not try very hard to reap all available messages, though, since
1394+ * there's unlikely to be more than one.
14081395 */
1409- ListenToWorkers (AH , pstate , true);
1410- }
1411- }
1412-
1413- /*
1414- * Wait for all workers to be idle.
1415- * Reap worker status as necessary to move FINISHED workers to IDLE state.
1416- *
1417- * We assume that no extra processing is required when reaping a finished
1418- * command, except for checking that the status was OK (zero).
1419- * Caution: that assumption means that this function can only be used in
1420- * parallel dump, not parallel restore, because the latter has a more
1421- * complex set of rules about handling status.
1422- *
1423- * This function is executed in the master process.
1424- */
1425- void
1426- EnsureWorkersFinished (ArchiveHandle * AH , ParallelState * pstate )
1427- {
1428- int work_status ;
1396+ if (ListenToWorkers (AH , pstate , do_wait ))
1397+ {
1398+ /*
1399+ * If we got a message, we are done by definition for GOT_STATUS
1400+ * mode, and we can also be certain that there's at least one idle
1401+ * worker. So we're done in all but ALL_IDLE mode.
1402+ */
1403+ if (mode != WFW_ALL_IDLE )
1404+ return ;
1405+ }
14291406
1430- if (!pstate || pstate -> numWorkers == 1 )
1431- return ;
1407+ /* Check whether we must wait for new status messages */
1408+ switch (mode )
1409+ {
1410+ case WFW_NO_WAIT :
1411+ return ; /* never wait */
1412+ case WFW_GOT_STATUS :
1413+ Assert (false); /* can't get here, because we waited */
1414+ break ;
1415+ case WFW_ONE_IDLE :
1416+ if (GetIdleWorker (pstate ) != NO_SLOT )
1417+ return ;
1418+ break ;
1419+ case WFW_ALL_IDLE :
1420+ if (IsEveryWorkerIdle (pstate ))
1421+ return ;
1422+ break ;
1423+ }
14321424
1433- /* Waiting for the remaining worker processes to finish */
1434- while (!IsEveryWorkerIdle (pstate ))
1435- {
1436- if (ReapWorkerStatus (pstate , & work_status ) == NO_SLOT )
1437- ListenToWorkers (AH , pstate , true);
1438- else if (work_status != 0 )
1439- exit_horribly (modulename ,
1440- "error processing a parallel work item\n" );
1425+ /* Loop back, and this time wait for something to happen */
1426+ do_wait = true;
14411427 }
14421428}
14431429
0 commit comments