@@ -437,10 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
437437 WaitForParallelWorkersToFinish (pcxt );
438438 WaitForParallelWorkersToExit (pcxt );
439439 pcxt -> nworkers_launched = 0 ;
440- if (pcxt -> any_message_received )
440+ if (pcxt -> known_attached_workers )
441441 {
442- pfree (pcxt -> any_message_received );
443- pcxt -> any_message_received = NULL ;
442+ pfree (pcxt -> known_attached_workers );
443+ pcxt -> known_attached_workers = NULL ;
444+ pcxt -> nknown_attached_workers = 0 ;
444445 }
445446 }
446447
@@ -542,16 +543,147 @@ LaunchParallelWorkers(ParallelContext *pcxt)
542543
543544 /*
544545 * Now that nworkers_launched has taken its final value, we can initialize
545- * any_message_received .
546+ * known_attached_workers .
546547 */
547548 if (pcxt -> nworkers_launched > 0 )
548- pcxt -> any_message_received =
549+ {
550+ pcxt -> known_attached_workers =
549551 palloc0 (sizeof (bool ) * pcxt -> nworkers_launched );
552+ pcxt -> nknown_attached_workers = 0 ;
553+ }
550554
551555 /* Restore previous memory context. */
552556 MemoryContextSwitchTo (oldcontext );
553557}
554558
559+ /*
560+ * Wait for all workers to attach to their error queues, and throw an error if
561+ * any worker fails to do this.
562+ *
563+ * Callers can assume that if this function returns successfully, then the
564+ * number of workers given by pcxt->nworkers_launched have initialized and
565+ * attached to their error queues. Whether or not these workers are guaranteed
566+ * to still be running depends on what code the caller asked them to run;
567+ * this function does not guarantee that they have not exited. However, it
568+ * does guarantee that any workers which exited must have done so cleanly and
569+ * after successfully performing the work with which they were tasked.
570+ *
571+ * If this function is not called, then some of the workers that were launched
572+ * may not have been started due to a fork() failure, or may have exited during
573+ * early startup prior to attaching to the error queue, so nworkers_launched
574+ * cannot be viewed as completely reliable. It will never be less than the
575+ * number of workers which actually started, but it might be more. Any workers
576+ * that failed to start will still be discovered by
577+ * WaitForParallelWorkersToFinish and an error will be thrown at that time,
578+ * provided that function is eventually reached.
579+ *
580+ * In general, the leader process should do as much work as possible before
581+ * calling this function. fork() failures and other early-startup failures
582+ * are very uncommon, and having the leader sit idle when it could be doing
583+ * useful work is undesirable. However, if the leader needs to wait for
584+ * all of its workers or for a specific worker, it may want to call this
585+ * function before doing so. If not, it must make some other provision for
586+ * the failure-to-start case, lest it wait forever. On the other hand, a
587+ * leader which never waits for a worker that might not be started yet, or
588+ * at least never does so prior to WaitForParallelWorkersToFinish(), need not
589+ * call this function at all.
590+ */
591+ void
592+ WaitForParallelWorkersToAttach (ParallelContext * pcxt )
593+ {
594+ int i ;
595+
596+ /* Skip this if we have no launched workers. */
597+ if (pcxt -> nworkers_launched == 0 )
598+ return ;
599+
600+ for (;;)
601+ {
602+ /*
603+ * This will process any parallel messages that are pending and it may
604+ * also throw an error propagated from a worker.
605+ */
606+ CHECK_FOR_INTERRUPTS ();
607+
608+ for (i = 0 ; i < pcxt -> nworkers_launched ; ++ i )
609+ {
610+ BgwHandleStatus status ;
611+ shm_mq * mq ;
612+ int rc ;
613+ pid_t pid ;
614+
615+ if (pcxt -> known_attached_workers [i ])
616+ continue ;
617+
618+ /*
619+ * If error_mqh is NULL, then the worker has already exited
620+ * cleanly.
621+ */
622+ if (pcxt -> worker [i ].error_mqh == NULL )
623+ {
624+ pcxt -> known_attached_workers [i ] = true;
625+ ++ pcxt -> nknown_attached_workers ;
626+ continue ;
627+ }
628+
629+ status = GetBackgroundWorkerPid (pcxt -> worker [i ].bgwhandle , & pid );
630+ if (status == BGWH_STARTED )
631+ {
632+ /* Has the worker attached to the error queue? */
633+ mq = shm_mq_get_queue (pcxt -> worker [i ].error_mqh );
634+ if (shm_mq_get_sender (mq ) != NULL )
635+ {
636+ /* Yes, so it is known to be attached. */
637+ pcxt -> known_attached_workers [i ] = true;
638+ ++ pcxt -> nknown_attached_workers ;
639+ }
640+ }
641+ else if (status == BGWH_STOPPED )
642+ {
643+ /*
644+ * If the worker stopped without attaching to the error queue,
645+ * throw an error.
646+ */
647+ mq = shm_mq_get_queue (pcxt -> worker [i ].error_mqh );
648+ if (shm_mq_get_sender (mq ) == NULL )
649+ ereport (ERROR ,
650+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
651+ errmsg ("parallel worker failed to initialize" ),
652+ errhint ("More details may be available in the server log." )));
653+
654+ pcxt -> known_attached_workers [i ] = true;
655+ ++ pcxt -> nknown_attached_workers ;
656+ }
657+ else
658+ {
659+ /*
660+ * Worker not yet started, so we must wait. The postmaster
661+ * will notify us if the worker's state changes. Our latch
662+ * might also get set for some other reason, but if so we'll
663+ * just end up waiting for the same worker again.
664+ */
665+ rc = WaitLatch (MyLatch ,
666+ WL_LATCH_SET | WL_POSTMASTER_DEATH ,
667+ -1 , WAIT_EVENT_BGWORKER_STARTUP );
668+
669+ /* emergency bailout if postmaster has died */
670+ if (rc & WL_POSTMASTER_DEATH )
671+ proc_exit (1 );
672+
673+ if (rc & WL_LATCH_SET )
674+ ResetLatch (MyLatch );
675+ }
676+ }
677+
678+ /* If all workers are known to have started, we're done. */
679+ if (pcxt -> nknown_attached_workers >= pcxt -> nworkers_launched )
680+ {
681+ Assert (pcxt -> nknown_attached_workers == pcxt -> nworkers_launched );
682+ break ;
683+ }
684+ }
685+ }
686+
555687/*
556688 * Wait for all workers to finish computing.
557689 *
@@ -589,7 +721,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
589721 */
590722 if (pcxt -> worker [i ].error_mqh == NULL )
591723 ++ nfinished ;
592- else if (pcxt -> any_message_received [i ])
724+ else if (pcxt -> known_attached_workers [i ])
593725 {
594726 anyone_alive = true;
595727 break ;
@@ -909,8 +1041,12 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
9091041{
9101042 char msgtype ;
9111043
912- if (pcxt -> any_message_received != NULL )
913- pcxt -> any_message_received [i ] = true;
1044+ if (pcxt -> known_attached_workers != NULL &&
1045+ !pcxt -> known_attached_workers [i ])
1046+ {
1047+ pcxt -> known_attached_workers [i ] = true;
1048+ pcxt -> nknown_attached_workers ++ ;
1049+ }
9141050
9151051 msgtype = pq_getmsgbyte (msg );
9161052
0 commit comments