@@ -77,8 +77,6 @@ static ShutdownInformation shutdown_info;
7777static const char * modulename = gettext_noop ("parallel archiver" );
7878
7979static ParallelSlot * GetMyPSlot (ParallelState * pstate );
80- static void parallel_msg_master (ParallelSlot * slot , const char * modulename ,
81- const char * fmt , va_list ap ) pg_attribute_printf (3 , 0 );
8280static void archive_close_connection (int code , void * arg );
8381static void ShutdownWorkersHard (ParallelState * pstate );
8482static void WaitForTerminatingWorkers (ParallelState * pstate );
@@ -162,65 +160,6 @@ GetMyPSlot(ParallelState *pstate)
162160 return NULL ;
163161}
164162
165- /*
166- * Fail and die, with a message to stderr. Parameters as for write_msg.
167- *
168- * This is defined in parallel.c, because in parallel mode, things are more
169- * complicated. If the worker process does exit_horribly(), we forward its
170- * last words to the master process. The master process then does
171- * exit_horribly() with this error message itself and prints it normally.
172- * After printing the message, exit_horribly() on the master will shut down
173- * the remaining worker processes.
174- */
175- void
176- exit_horribly (const char * modulename , const char * fmt ,...)
177- {
178- va_list ap ;
179- ParallelState * pstate = shutdown_info .pstate ;
180- ParallelSlot * slot ;
181-
182- va_start (ap , fmt );
183-
184- if (pstate == NULL )
185- {
186- /* Not in parallel mode, just write to stderr */
187- vwrite_msg (modulename , fmt , ap );
188- }
189- else
190- {
191- slot = GetMyPSlot (pstate );
192-
193- if (!slot )
194- /* We're the parent, just write the message out */
195- vwrite_msg (modulename , fmt , ap );
196- else
197- /* If we're a worker process, send the msg to the master process */
198- parallel_msg_master (slot , modulename , fmt , ap );
199- }
200-
201- va_end (ap );
202-
203- exit_nicely (1 );
204- }
205-
206- /* Sends the error message from the worker to the master process */
207- static void
208- parallel_msg_master (ParallelSlot * slot , const char * modulename ,
209- const char * fmt , va_list ap )
210- {
211- char buf [512 ];
212- int pipefd [2 ];
213-
214- pipefd [PIPE_READ ] = slot -> pipeRevRead ;
215- pipefd [PIPE_WRITE ] = slot -> pipeRevWrite ;
216-
217- strcpy (buf , "ERROR " );
218- vsnprintf (buf + strlen ("ERROR " ),
219- sizeof (buf ) - strlen ("ERROR " ), fmt , ap );
220-
221- sendMessageToMaster (pipefd , buf );
222- }
223-
224163/*
225164 * A thread-local version of getLocalPQExpBuffer().
226165 *
@@ -271,7 +210,7 @@ getThreadLocalPQExpBuffer(void)
271210
272211/*
273212 * pg_dump and pg_restore register the Archive pointer for the exit handler
274- * (called from exit_horribly ). This function mainly exists so that we can
213+ * (called from exit_nicely ). This function mainly exists so that we can
275214 * keep shutdown_info in file scope only.
276215 */
277216void
@@ -282,8 +221,8 @@ on_exit_close_archive(Archive *AHX)
282221}
283222
284223/*
285- * This function can close archives in both the parallel and non-parallel
286- * case .
224+ * on_exit_nicely handler for shutting down database connections and
225+ * worker processes cleanly .
287226 */
288227static void
289228archive_close_connection (int code , void * arg )
@@ -292,42 +231,62 @@ archive_close_connection(int code, void *arg)
292231
293232 if (si -> pstate )
294233 {
234+ /* In parallel mode, must figure out who we are */
295235 ParallelSlot * slot = GetMyPSlot (si -> pstate );
296236
297237 if (!slot )
298238 {
299239 /*
300- * We're the master: We have already printed out the message
301- * passed to exit_horribly() either from the master itself or from
302- * a worker process. Now we need to close our own database
303- * connection (only open during parallel dump but not restore) and
304- * shut down the remaining workers.
240+ * We're the master. Close our own database connection, if any,
241+ * and then forcibly shut down workers.
305242 */
306- DisconnectDatabase (si -> AHX );
243+ if (si -> AHX )
244+ DisconnectDatabase (si -> AHX );
245+
307246#ifndef WIN32
308247
309248 /*
310- * Setting aborting to true switches to best-effort-mode
311- * (send/receive but ignore errors) in communicating with our
312- * workers.
249+ * Setting aborting to true shuts off error/warning messages that
250+ * are no longer useful once we start killing workers.
313251 */
314252 aborting = true;
315253#endif
316254 ShutdownWorkersHard (si -> pstate );
317255 }
318- else if (slot -> args -> AH )
319- DisconnectDatabase (& (slot -> args -> AH -> public ));
256+ else
257+ {
258+ /*
259+ * We're a worker. Shut down our own DB connection if any. On
260+ * Windows, we also have to close our communication sockets, to
261+ * emulate what will happen on Unix when the worker process exits.
262+ * (Without this, if this is a premature exit, the master would
263+ * fail to detect it because there would be no EOF condition on
264+ * the other end of the pipe.)
265+ */
266+ if (slot -> args -> AH )
267+ DisconnectDatabase (& (slot -> args -> AH -> public ));
268+
269+ #ifdef WIN32
270+ closesocket (slot -> pipeRevRead );
271+ closesocket (slot -> pipeRevWrite );
272+ #endif
273+ }
274+ }
275+ else
276+ {
277+ /* Non-parallel operation: just kill the master DB connection */
278+ if (si -> AHX )
279+ DisconnectDatabase (si -> AHX );
320280 }
321- else if (si -> AHX )
322- DisconnectDatabase (si -> AHX );
323281}
324282
325283/*
326284 * If we have one worker that terminates for some reason, we'd like the other
327285 * threads to terminate as well (and not finish with their 70 GB table dump
328286 * first...). Now in UNIX we can just kill these processes, and let the signal
329287 * handler set wantAbort to 1. In Windows we set a termEvent and this serves
330- * as the signal for everyone to terminate.
288+ * as the signal for everyone to terminate. We don't print any error message,
289+ * that would just clutter the screen.
331290 */
332291void
333292checkAborting (ArchiveHandle * AH )
@@ -337,7 +296,7 @@ checkAborting(ArchiveHandle *AH)
337296#else
338297 if (wantAbort )
339298#endif
340- exit_horribly ( modulename , "worker is terminating\n" );
299+ exit_nicely ( 1 );
341300}
342301
343302/*
@@ -352,8 +311,6 @@ ShutdownWorkersHard(ParallelState *pstate)
352311#ifndef WIN32
353312 int i ;
354313
355- signal (SIGPIPE , SIG_IGN );
356-
357314 /*
358315 * Close our write end of the sockets so that the workers know they can
359316 * exit.
@@ -428,27 +385,21 @@ sigTermHandler(int signum)
428385#endif
429386
430387/*
431- * This function is called by both UNIX and Windows variants to set up a
432- * worker process.
388+ * This function is called by both UNIX and Windows variants to set up
389+ * and run a worker process. Caller should exit the process (or thread)
390+ * upon return.
433391 */
434392static void
435393SetupWorker (ArchiveHandle * AH , int pipefd [2 ], int worker )
436394{
437395 /*
438396 * Call the setup worker function that's defined in the ArchiveHandle.
439- *
440- * We get the raw connection only for the reason that we can close it
441- * properly when we shut down. This happens only that way when it is
442- * brought down because of an error.
443397 */
444398 (AH -> SetupWorkerPtr ) ((Archive * ) AH );
445399
446400 Assert (AH -> connection != NULL );
447401
448402 WaitForCommands (AH , pipefd );
449-
450- closesocket (pipefd [PIPE_READ ]);
451- closesocket (pipefd [PIPE_WRITE ]);
452403}
453404
454405#ifdef WIN32
@@ -534,14 +485,22 @@ ParallelBackupStart(ArchiveHandle *AH)
534485 pstate -> parallelSlot [i ].args = (ParallelArgs * ) pg_malloc (sizeof (ParallelArgs ));
535486 pstate -> parallelSlot [i ].args -> AH = NULL ;
536487 pstate -> parallelSlot [i ].args -> te = NULL ;
488+
489+ /* master's ends of the pipes */
490+ pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
491+ pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
492+ /* child's ends of the pipes */
493+ pstate -> parallelSlot [i ].pipeRevRead = pipeMW [PIPE_READ ];
494+ pstate -> parallelSlot [i ].pipeRevWrite = pipeWM [PIPE_WRITE ];
495+
537496#ifdef WIN32
538497 /* Allocate a new structure for every worker */
539498 wi = (WorkerInfo * ) pg_malloc (sizeof (WorkerInfo ));
540499
541500 wi -> worker = i ;
542501 wi -> AH = AH ;
543- wi -> pipeRead = pstate -> parallelSlot [ i ]. pipeRevRead = pipeMW [PIPE_READ ];
544- wi -> pipeWrite = pstate -> parallelSlot [ i ]. pipeRevWrite = pipeWM [PIPE_WRITE ];
502+ wi -> pipeRead = pipeMW [PIPE_READ ];
503+ wi -> pipeWrite = pipeWM [PIPE_WRITE ];
545504
546505 handle = _beginthreadex (NULL , 0 , (void * ) & init_spawned_worker_win32 ,
547506 wi , 0 , & (pstate -> parallelSlot [i ].threadId ));
@@ -557,15 +516,6 @@ ParallelBackupStart(ArchiveHandle *AH)
557516 pipefd [0 ] = pipeMW [PIPE_READ ];
558517 pipefd [1 ] = pipeWM [PIPE_WRITE ];
559518
560- /*
561- * Store the fds for the reverse communication in pstate. Actually
562- * we only use this in case of an error and don't use pstate
563- * otherwise in the worker process. On Windows we write to the
564- * global pstate, in Unix we write to our process-local copy but
565- * that's also where we'd retrieve this information back from.
566- */
567- pstate -> parallelSlot [i ].pipeRevRead = pipefd [PIPE_READ ];
568- pstate -> parallelSlot [i ].pipeRevWrite = pipefd [PIPE_WRITE ];
569519 pstate -> parallelSlot [i ].pid = getpid ();
570520
571521 /*
@@ -584,7 +534,7 @@ ParallelBackupStart(ArchiveHandle *AH)
584534
585535 /*
586536 * Close all inherited fds for communication of the master with
587- * the other workers.
537+ * previously-forked workers.
588538 */
589539 for (j = 0 ; j < i ; j ++ )
590540 {
@@ -612,11 +562,16 @@ ParallelBackupStart(ArchiveHandle *AH)
612562
613563 pstate -> parallelSlot [i ].pid = pid ;
614564#endif
615-
616- pstate -> parallelSlot [i ].pipeRead = pipeWM [PIPE_READ ];
617- pstate -> parallelSlot [i ].pipeWrite = pipeMW [PIPE_WRITE ];
618565 }
619566
567+ /*
568+ * Having forked off the workers, disable SIGPIPE so that master isn't
569+ * killed if it tries to send a command to a dead worker.
570+ */
571+ #ifndef WIN32
572+ signal (SIGPIPE , SIG_IGN );
573+ #endif
574+
620575 return pstate ;
621576}
622577
@@ -977,16 +932,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
977932 }
978933 else
979934 exit_horribly (modulename ,
980- "invalid message received from worker: %s\n" , msg );
981- }
982- else if (messageStartsWith (msg , "ERROR " ))
983- {
984- Assert (AH -> format == archDirectory || AH -> format == archCustom );
985- pstate -> parallelSlot [worker ].workerStatus = WRKR_TERMINATED ;
986- exit_horribly (modulename , "%s" , msg + strlen ("ERROR " ));
935+ "invalid message received from worker: \"%s\"\n" ,
936+ msg );
987937 }
988938 else
989- exit_horribly (modulename , "invalid message received from worker: %s\n" , msg );
939+ exit_horribly (modulename ,
940+ "invalid message received from worker: \"%s\"\n" ,
941+ msg );
990942
991943 /* both Unix and Win32 return pg_malloc()ed space, so we free it */
992944 free (msg );
0 commit comments