@@ -76,8 +76,6 @@ static ShutdownInformation shutdown_info;
7676static const char * modulename = gettext_noop ("parallel archiver" );
7777
7878static ParallelSlot * GetMyPSlot (ParallelState * pstate );
79- static void parallel_msg_master (ParallelSlot * slot , const char * modulename ,
80- const char * fmt , va_list ap ) pg_attribute_printf (3 , 0 );
8179static void archive_close_connection (int code , void * arg );
8280static void ShutdownWorkersHard (ParallelState * pstate );
8381static void WaitForTerminatingWorkers (ParallelState * pstate );
@@ -161,65 +159,6 @@ GetMyPSlot(ParallelState *pstate)
161159 return NULL ;
162160}
163161
164- /*
165- * Fail and die, with a message to stderr. Parameters as for write_msg.
166- *
167- * This is defined in parallel.c, because in parallel mode, things are more
168- * complicated. If the worker process does exit_horribly(), we forward its
169- * last words to the master process. The master process then does
170- * exit_horribly() with this error message itself and prints it normally.
171- * After printing the message, exit_horribly() on the master will shut down
172- * the remaining worker processes.
173- */
174- void
175- exit_horribly (const char * modulename , const char * fmt ,...)
176- {
177- va_list ap ;
178- ParallelState * pstate = shutdown_info .pstate ;
179- ParallelSlot * slot ;
180-
181- va_start (ap , fmt );
182-
183- if (pstate == NULL )
184- {
185- /* Not in parallel mode, just write to stderr */
186- vwrite_msg (modulename , fmt , ap );
187- }
188- else
189- {
190- slot = GetMyPSlot (pstate );
191-
192- if (!slot )
193- /* We're the parent, just write the message out */
194- vwrite_msg (modulename , fmt , ap );
195- else
196- /* If we're a worker process, send the msg to the master process */
197- parallel_msg_master (slot , modulename , fmt , ap );
198- }
199-
200- va_end (ap );
201-
202- exit_nicely (1 );
203- }
204-
205- /* Sends the error message from the worker to the master process */
206- static void
207- parallel_msg_master (ParallelSlot * slot , const char * modulename ,
208- const char * fmt , va_list ap )
209- {
210- char buf [512 ];
211- int pipefd [2 ];
212-
213- pipefd [PIPE_READ ] = slot -> pipeRevRead ;
214- pipefd [PIPE_WRITE ] = slot -> pipeRevWrite ;
215-
216- strcpy (buf , "ERROR " );
217- vsnprintf (buf + strlen ("ERROR " ),
218- sizeof (buf ) - strlen ("ERROR " ), fmt , ap );
219-
220- sendMessageToMaster (pipefd , buf );
221- }
222-
223162/*
224163 * A thread-local version of getLocalPQExpBuffer().
225164 *
@@ -270,7 +209,7 @@ getThreadLocalPQExpBuffer(void)
270209
271210/*
272211 * pg_dump and pg_restore register the Archive pointer for the exit handler
273- * (called from exit_horribly ). This function mainly exists so that we can
212+ * (called from exit_nicely ). This function mainly exists so that we can
274213 * keep shutdown_info in file scope only.
275214 */
276215void
@@ -281,8 +220,8 @@ on_exit_close_archive(Archive *AHX)
281220}
282221
283222/*
284- * This function can close archives in both the parallel and non-parallel
285- * case .
223+ * on_exit_nicely handler for shutting down database connections and
224+ * worker processes cleanly .
286225 */
287226static void
288227archive_close_connection (int code , void * arg )
@@ -291,42 +230,62 @@ archive_close_connection(int code, void *arg)
291230
292231 if (si -> pstate )
293232 {
233+ /* In parallel mode, must figure out who we are */
294234 ParallelSlot * slot = GetMyPSlot (si -> pstate );
295235
296236 if (!slot )
297237 {
298238 /*
299- * We're the master: We have already printed out the message
300- * passed to exit_horribly() either from the master itself or from
301- * a worker process. Now we need to close our own database
302- * connection (only open during parallel dump but not restore) and
303- * shut down the remaining workers.
239+ * We're the master. Close our own database connection, if any,
240+ * and then forcibly shut down workers.
304241 */
305- DisconnectDatabase (si -> AHX );
242+ if (si -> AHX )
243+ DisconnectDatabase (si -> AHX );
244+
306245#ifndef WIN32
307246
308247 /*
309- * Setting aborting to true switches to best-effort-mode
310- * (send/receive but ignore errors) in communicating with our
311- * workers.
248+ * Setting aborting to true shuts off error/warning messages that
249+ * are no longer useful once we start killing workers.
312250 */
313251 aborting = true;
314252#endif
315253 ShutdownWorkersHard (si -> pstate );
316254 }
317- else if (slot -> args -> AH )
318- DisconnectDatabase (& (slot -> args -> AH -> public ));
255+ else
256+ {
257+ /*
258+ * We're a worker. Shut down our own DB connection if any. On
259+ * Windows, we also have to close our communication sockets, to
260+ * emulate what will happen on Unix when the worker process exits.
261+ * (Without this, if this is a premature exit, the master would
262+ * fail to detect it because there would be no EOF condition on
263+ * the other end of the pipe.)
264+ */
265+ if (slot -> args -> AH )
266+ DisconnectDatabase (& (slot -> args -> AH -> public ));
267+
268+ #ifdef WIN32
269+ closesocket (slot -> pipeRevRead );
270+ closesocket (slot -> pipeRevWrite );
271+ #endif
272+ }
273+ }
274+ else
275+ {
276+ /* Non-parallel operation: just kill the master DB connection */
277+ if (si -> AHX )
278+ DisconnectDatabase (si -> AHX );
319279 }
320- else if (si -> AHX )
321- DisconnectDatabase (si -> AHX );
322280}
323281
324282/*
325283 * If we have one worker that terminates for some reason, we'd like the other
326284 * threads to terminate as well (and not finish with their 70 GB table dump
327285 * first...). Now in UNIX we can just kill these processes, and let the signal
328286 * handler set wantAbort to 1. In Windows we set a termEvent and this serves
329- * as the signal for everyone to terminate.
287+ * as the signal for everyone to terminate. We don't print any error message,
288+ * that would just clutter the screen.
330289 */
331290void
332291checkAborting (ArchiveHandle * AH )
@@ -336,7 +295,7 @@ checkAborting(ArchiveHandle *AH)
336295#else
337296 if (wantAbort )
338297#endif
339- exit_horribly ( modulename , "worker is terminating\n" );
298+ exit_nicely ( 1 );
340299}
341300
342301/*
@@ -351,8 +310,6 @@ ShutdownWorkersHard(ParallelState *pstate)
351310#ifndef WIN32
352311 int i ;
353312
354- signal (SIGPIPE , SIG_IGN );
355-
356313 /*
357314 * Close our write end of the sockets so that the workers know they can
358315 * exit.
@@ -427,27 +384,21 @@ sigTermHandler(int signum)
427384#endif
428385
429386/*
430- * This function is called by both UNIX and Windows variants to set up a
431- * worker process.
387+ * This function is called by both UNIX and Windows variants to set up
388+ * and run a worker process. Caller should exit the process (or thread)
389+ * upon return.
432390 */
433391static void
434392SetupWorker (ArchiveHandle * AH , int pipefd [2 ], int worker )
435393{
436394 /*
437395 * Call the setup worker function that's defined in the ArchiveHandle.
438- *
439- * We get the raw connection only for the reason that we can close it
440- * properly when we shut down. This happens only that way when it is
441- * brought down because of an error.
442396 */
443397 (AH -> SetupWorkerPtr ) ((Archive * ) AH );
444398
445399 Assert (AH -> connection != NULL );
446400
447401 WaitForCommands (AH , pipefd );
448-
449- closesocket (pipefd [PIPE_READ ]);
450- closesocket (pipefd [PIPE_WRITE ]);
451402}
452403
453404#ifdef WIN32
@@ -533,14 +484,22 @@ ParallelBackupStart(ArchiveHandle *AH)
533484 pstate -> parallelSlot [i ].args = (ParallelArgs * ) pg_malloc (sizeof (ParallelArgs ));
534485 pstate -> parallelSlot [i ].args -> AH = NULL ;
535486 pstate -> parallelSlot [i ].args -> te = NULL ;
487+
488+ /* master's ends of the pipes */
489+ pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
490+ pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
491+ /* child's ends of the pipes */
492+ pstate -> parallelSlot [i ].pipeRevRead = pipeMW [PIPE_READ ];
493+ pstate -> parallelSlot [i ].pipeRevWrite = pipeWM [PIPE_WRITE ];
494+
536495#ifdef WIN32
537496 /* Allocate a new structure for every worker */
538497 wi = (WorkerInfo * ) pg_malloc (sizeof (WorkerInfo ));
539498
540499 wi -> worker = i ;
541500 wi -> AH = AH ;
542- wi -> pipeRead = pstate -> parallelSlot [ i ]. pipeRevRead = pipeMW [PIPE_READ ];
543- wi -> pipeWrite = pstate -> parallelSlot [ i ]. pipeRevWrite = pipeWM [PIPE_WRITE ];
501+ wi -> pipeRead = pipeMW [PIPE_READ ];
502+ wi -> pipeWrite = pipeWM [PIPE_WRITE ];
544503
545504 handle = _beginthreadex (NULL , 0 , (void * ) & init_spawned_worker_win32 ,
546505 wi , 0 , & (pstate -> parallelSlot [i ].threadId ));
@@ -556,15 +515,6 @@ ParallelBackupStart(ArchiveHandle *AH)
556515 pipefd [0 ] = pipeMW [PIPE_READ ];
557516 pipefd [1 ] = pipeWM [PIPE_WRITE ];
558517
559- /*
560- * Store the fds for the reverse communication in pstate. Actually
561- * we only use this in case of an error and don't use pstate
562- * otherwise in the worker process. On Windows we write to the
563- * global pstate, in Unix we write to our process-local copy but
564- * that's also where we'd retrieve this information back from.
565- */
566- pstate -> parallelSlot [i ].pipeRevRead = pipefd [PIPE_READ ];
567- pstate -> parallelSlot [i ].pipeRevWrite = pipefd [PIPE_WRITE ];
568518 pstate -> parallelSlot [i ].pid = getpid ();
569519
570520 /*
@@ -583,7 +533,7 @@ ParallelBackupStart(ArchiveHandle *AH)
583533
584534 /*
585535 * Close all inherited fds for communication of the master with
586- * the other workers.
536+ * previously-forked workers.
587537 */
588538 for (j = 0 ; j < i ; j ++ )
589539 {
@@ -611,11 +561,16 @@ ParallelBackupStart(ArchiveHandle *AH)
611561
612562 pstate -> parallelSlot [i ].pid = pid ;
613563#endif
614-
615- pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
616- pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
617564 }
618565
566+ /*
567+ * Having forked off the workers, disable SIGPIPE so that master isn't
568+ * killed if it tries to send a command to a dead worker.
569+ */
570+ #ifndef WIN32
571+ signal (SIGPIPE , SIG_IGN );
572+ #endif
573+
619574 return pstate ;
620575}
621576
@@ -976,16 +931,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
976931 }
977932 else
978933 exit_horribly (modulename ,
979- "invalid message received from worker: %s\n" , msg );
980- }
981- else if (messageStartsWith (msg , "ERROR " ))
982- {
983- Assert (AH -> format == archDirectory || AH -> format == archCustom );
984- pstate -> parallelSlot [worker ].workerStatus = WRKR_TERMINATED ;
985- exit_horribly (modulename , "%s" , msg + strlen ("ERROR " ));
934+ "invalid message received from worker: \"%s\"\n" ,
935+ msg );
986936 }
987937 else
988- exit_horribly (modulename , "invalid message received from worker: %s\n" , msg );
938+ exit_horribly (modulename ,
939+ "invalid message received from worker: \"%s\"\n" ,
940+ msg );
989941
990942 /* both Unix and Win32 return pg_malloc()ed space, so we free it */
991943 free (msg );
0 commit comments