1414
1515#include "postgres.h"
1616
17+ #include "access/parallel.h"
1718#include "access/xact.h"
1819#include "access/xlog.h"
19- #include "access/parallel.h"
2020#include "commands/async.h"
2121#include "libpq/libpq.h"
2222#include "libpq/pqformat.h"
3333#include "utils/resowner.h"
3434#include "utils/snapmgr.h"
3535
36+
3637/*
3738 * We don't want to waste a lot of memory on an error queue which, most of
3839 * the time, will process only a handful of small messages. However, it is
@@ -90,7 +91,7 @@ typedef struct FixedParallelState
9091int ParallelWorkerNumber = -1 ;
9192
9293/* Is there a parallel message pending which we need to receive? */
93- bool ParallelMessagePending = false;
94+ volatile bool ParallelMessagePending = false;
9495
9596/* Are we initializing a parallel worker? */
9697bool InitializingParallelWorker = false;
@@ -102,11 +103,12 @@ static FixedParallelState *MyFixedParallelState;
102103static dlist_head pcxt_list = DLIST_STATIC_INIT (pcxt_list );
103104
104105/* Private functions. */
105- static void HandleParallelMessage (ParallelContext * , int , StringInfo msg );
106+ static void HandleParallelMessage (ParallelContext * pcxt , int i , StringInfo msg );
106107static void ParallelErrorContext (void * arg );
107108static void ParallelExtensionTrampoline (dsm_segment * seg , shm_toc * toc );
108109static void ParallelWorkerMain (Datum main_arg );
109110
111+
110112/*
111113 * Establish a new parallel context. This should be done after entering
112114 * parallel mode, and (unless there is an error) the context should be
@@ -178,8 +180,8 @@ CreateParallelContextForExternalFunction(char *library_name,
178180
179181/*
180182 * Establish the dynamic shared memory segment for a parallel context and
181- * copied state and other bookkeeping information that will need by parallel
182- * workers into it.
183+ * copy state and other bookkeeping information that will be needed by
184+ * parallel workers into it.
183185 */
184186void
185187InitializeParallelDSM (ParallelContext * pcxt )
@@ -231,7 +233,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
231233 PARALLEL_ERROR_QUEUE_SIZE ,
232234 "parallel error queue size not buffer-aligned" );
233235 shm_toc_estimate_chunk (& pcxt -> estimator ,
234- PARALLEL_ERROR_QUEUE_SIZE * pcxt -> nworkers );
236+ mul_size (PARALLEL_ERROR_QUEUE_SIZE ,
237+ pcxt -> nworkers ));
235238 shm_toc_estimate_keys (& pcxt -> estimator , 1 );
236239
237240 /* Estimate how much we'll need for extension entrypoint info. */
@@ -257,7 +260,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
257260 * parallelism than to fail outright.
258261 */
259262 segsize = shm_toc_estimate (& pcxt -> estimator );
260- if (pcxt -> nworkers != 0 )
263+ if (pcxt -> nworkers > 0 )
261264 pcxt -> seg = dsm_create (segsize , DSM_CREATE_NULL_IF_MAXSEGMENTS );
262265 if (pcxt -> seg != NULL )
263266 pcxt -> toc = shm_toc_create (PARALLEL_MAGIC ,
@@ -337,7 +340,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
337340 */
338341 error_queue_space =
339342 shm_toc_allocate (pcxt -> toc ,
340- PARALLEL_ERROR_QUEUE_SIZE * pcxt -> nworkers );
343+ mul_size (PARALLEL_ERROR_QUEUE_SIZE ,
344+ pcxt -> nworkers ));
341345 for (i = 0 ; i < pcxt -> nworkers ; ++ i )
342346 {
343347 char * start ;
@@ -603,17 +607,17 @@ ParallelContextActive(void)
603607
604608/*
605609 * Handle receipt of an interrupt indicating a parallel worker message.
610+ *
611+ * Note: this is called within a signal handler! All we can do is set
612+ * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
613+ * HandleParallelMessages().
606614 */
607615void
608616HandleParallelMessageInterrupt (void )
609617{
610- int save_errno = errno ;
611-
612618 InterruptPending = true;
613619 ParallelMessagePending = true;
614620 SetLatch (MyLatch );
615-
616- errno = save_errno ;
617621}
618622
619623/*
@@ -664,11 +668,8 @@ HandleParallelMessages(void)
664668 }
665669 else
666670 ereport (ERROR ,
667- (errcode (ERRCODE_INTERNAL_ERROR ), /* XXX: wrong errcode? */
668- errmsg ("lost connection to parallel worker" )));
669-
670- /* This might make the error queue go away. */
671- CHECK_FOR_INTERRUPTS ();
671+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
672+ errmsg ("lost connection to parallel worker" )));
672673 }
673674 }
674675 }
@@ -714,7 +715,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
714715 errctx .previous = pcxt -> error_context_stack ;
715716 error_context_stack = & errctx ;
716717
717- /* Parse ErrorReponse or NoticeResponse. */
718+ /* Parse ErrorResponse or NoticeResponse. */
718719 pq_parse_errornotice (msg , & edata );
719720
720721 /* Death of a worker isn't enough justification for suicide. */
@@ -747,7 +748,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
747748
748749 default :
749750 {
750- elog (ERROR , "unknown message type: %c (%d bytes)" ,
751+ elog (ERROR , "unrecognized message type received from parallel worker : %c (message length %d bytes)" ,
751752 msgtype , msg -> len );
752753 }
753754 }
@@ -847,7 +848,7 @@ ParallelWorkerMain(Datum main_arg)
847848 if (toc == NULL )
848849 ereport (ERROR ,
849850 (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
850- errmsg ("invalid magic number in dynamic shared memory segment" )));
851+ errmsg ("invalid magic number in dynamic shared memory segment" )));
851852
852853 /* Look up fixed parallel state. */
853854 fps = shm_toc_lookup (toc , PARALLEL_KEY_FIXED );
0 commit comments