1919#include "access/xlog.h"
2020#include "catalog/namespace.h"
2121#include "commands/async.h"
22+ #include "executor/execParallel.h"
2223#include "libpq/libpq.h"
2324#include "libpq/pqformat.h"
2425#include "libpq/pqmq.h"
6162#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
6263#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
6364#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
64- #define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
65+ #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
6566
6667/* Fixed-size parallel state. */
6768typedef struct FixedParallelState
@@ -77,9 +78,6 @@ typedef struct FixedParallelState
7778 pid_t parallel_master_pid ;
7879 BackendId parallel_master_backend_id ;
7980
80- /* Entrypoint for parallel workers. */
81- parallel_worker_main_type entrypoint ;
82-
8381 /* Mutex protects remaining fields. */
8482 slock_t mutex ;
8583
@@ -107,10 +105,26 @@ static FixedParallelState *MyFixedParallelState;
107105/* List of active parallel contexts. */
108106static dlist_head pcxt_list = DLIST_STATIC_INIT (pcxt_list );
109107
108+ /*
109+ * List of internal parallel worker entry points. We need this for
110+ * reasons explained in LookupParallelWorkerFunction(), below.
111+ */
112+ static const struct
113+ {
114+ const char * fn_name ;
115+ parallel_worker_main_type fn_addr ;
116+ } InternalParallelWorkers [] =
117+
118+ {
119+ {
120+ "ParallelQueryMain" , ParallelQueryMain
121+ }
122+ };
123+
110124/* Private functions. */
111125static void HandleParallelMessage (ParallelContext * pcxt , int i , StringInfo msg );
112- static void ParallelExtensionTrampoline (dsm_segment * seg , shm_toc * toc );
113126static void WaitForParallelWorkersToExit (ParallelContext * pcxt );
127+ static parallel_worker_main_type LookupParallelWorkerFunction (const char * libraryname , const char * funcname );
114128
115129
116130/*
@@ -119,7 +133,8 @@ static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
119133 * destroyed before exiting the current subtransaction.
120134 */
121135ParallelContext *
122- CreateParallelContext (parallel_worker_main_type entrypoint , int nworkers )
136+ CreateParallelContext (const char * library_name , const char * function_name ,
137+ int nworkers )
123138{
124139 MemoryContext oldcontext ;
125140 ParallelContext * pcxt ;
@@ -152,7 +167,8 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
152167 pcxt = palloc0 (sizeof (ParallelContext ));
153168 pcxt -> subid = GetCurrentSubTransactionId ();
154169 pcxt -> nworkers = nworkers ;
155- pcxt -> entrypoint = entrypoint ;
170+ pcxt -> library_name = pstrdup (library_name );
171+ pcxt -> function_name = pstrdup (function_name );
156172 pcxt -> error_context_stack = error_context_stack ;
157173 shm_toc_initialize_estimator (& pcxt -> estimator );
158174 dlist_push_head (& pcxt_list , & pcxt -> node );
@@ -163,33 +179,6 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
163179 return pcxt ;
164180}
165181
166- /*
167- * Establish a new parallel context that calls a function provided by an
168- * extension. This works around the fact that the library might get mapped
169- * at a different address in each backend.
170- */
171- ParallelContext *
172- CreateParallelContextForExternalFunction (char * library_name ,
173- char * function_name ,
174- int nworkers )
175- {
176- MemoryContext oldcontext ;
177- ParallelContext * pcxt ;
178-
179- /* We might be running in a very short-lived memory context. */
180- oldcontext = MemoryContextSwitchTo (TopTransactionContext );
181-
182- /* Create the context. */
183- pcxt = CreateParallelContext (ParallelExtensionTrampoline , nworkers );
184- pcxt -> library_name = pstrdup (library_name );
185- pcxt -> function_name = pstrdup (function_name );
186-
187- /* Restore previous memory context. */
188- MemoryContextSwitchTo (oldcontext );
189-
190- return pcxt ;
191- }
192-
193182/*
194183 * Establish the dynamic shared memory segment for a parallel context and
195184 * copy state and other bookkeeping information that will be needed by
@@ -249,15 +238,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
249238 pcxt -> nworkers ));
250239 shm_toc_estimate_keys (& pcxt -> estimator , 1 );
251240
252- /* Estimate how much we'll need for extension entrypoint info. */
253- if (pcxt -> library_name != NULL )
254- {
255- Assert (pcxt -> entrypoint == ParallelExtensionTrampoline );
256- Assert (pcxt -> function_name != NULL );
257- shm_toc_estimate_chunk (& pcxt -> estimator , strlen (pcxt -> library_name )
258- + strlen (pcxt -> function_name ) + 2 );
259- shm_toc_estimate_keys (& pcxt -> estimator , 1 );
260- }
241+ /* Estimate how much we'll need for the entrypoint info. */
242+ shm_toc_estimate_chunk (& pcxt -> estimator , strlen (pcxt -> library_name ) +
243+ strlen (pcxt -> function_name ) + 2 );
244+ shm_toc_estimate_keys (& pcxt -> estimator , 1 );
261245 }
262246
263247 /*
@@ -297,7 +281,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
297281 fps -> parallel_master_pgproc = MyProc ;
298282 fps -> parallel_master_pid = MyProcPid ;
299283 fps -> parallel_master_backend_id = MyBackendId ;
300- fps -> entrypoint = pcxt -> entrypoint ;
301284 SpinLockInit (& fps -> mutex );
302285 fps -> last_xlog_end = 0 ;
303286 shm_toc_insert (pcxt -> toc , PARALLEL_KEY_FIXED , fps );
@@ -312,6 +295,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
312295 char * asnapspace ;
313296 char * tstatespace ;
314297 char * error_queue_space ;
298+ char * entrypointstate ;
299+ Size lnamelen ;
315300
316301 /* Serialize shared libraries we have loaded. */
317302 libraryspace = shm_toc_allocate (pcxt -> toc , library_len );
@@ -368,19 +353,19 @@ InitializeParallelDSM(ParallelContext *pcxt)
368353 }
369354 shm_toc_insert (pcxt -> toc , PARALLEL_KEY_ERROR_QUEUE , error_queue_space );
370355
371- /* Serialize extension entrypoint information. */
372- if ( pcxt -> library_name != NULL )
373- {
374- Size lnamelen = strlen ( pcxt -> library_name );
375- char * extensionstate ;
376-
377- extensionstate = shm_toc_allocate ( pcxt -> toc , lnamelen
378- + strlen (pcxt -> function_name ) + 2 );
379- strcpy ( extensionstate , pcxt -> library_name );
380- strcpy ( extensionstate + lnamelen + 1 , pcxt -> function_name );
381- shm_toc_insert ( pcxt -> toc , PARALLEL_KEY_EXTENSION_TRAMPOLINE ,
382- extensionstate );
383- }
356+ /*
357+ * Serialize entrypoint information. It's unsafe to pass function
358+ * pointers across processes, as the function pointer may be different
359+ * in each process in EXEC_BACKEND builds, so we always pass library
360+ * and function name. (We use library name "postgres" for functions
361+ * in the core backend.)
362+ */
363+ lnamelen = strlen (pcxt -> library_name );
364+ entrypointstate = shm_toc_allocate ( pcxt -> toc , lnamelen +
365+ strlen ( pcxt -> function_name ) + 2 );
366+ strcpy ( entrypointstate , pcxt -> library_name );
367+ strcpy ( entrypointstate + lnamelen + 1 , pcxt -> function_name );
368+ shm_toc_insert ( pcxt -> toc , PARALLEL_KEY_ENTRYPOINT , entrypointstate );
384369 }
385370
386371 /* Restore previous memory context. */
@@ -671,6 +656,8 @@ DestroyParallelContext(ParallelContext *pcxt)
671656 }
672657
673658 /* Free memory. */
659+ pfree (pcxt -> library_name );
660+ pfree (pcxt -> function_name );
674661 pfree (pcxt );
675662}
676663
@@ -941,6 +928,10 @@ ParallelWorkerMain(Datum main_arg)
941928 shm_mq * mq ;
942929 shm_mq_handle * mqh ;
943930 char * libraryspace ;
931+ char * entrypointstate ;
932+ char * library_name ;
933+ char * function_name ;
934+ parallel_worker_main_type entrypt ;
944935 char * gucspace ;
945936 char * combocidspace ;
946937 char * tsnapspace ;
@@ -1040,6 +1031,18 @@ ParallelWorkerMain(Datum main_arg)
10401031 Assert (libraryspace != NULL );
10411032 RestoreLibraryState (libraryspace );
10421033
1034+ /*
1035+ * Identify the entry point to be called. In theory this could result in
1036+ * loading an additional library, though most likely the entry point is in
1037+ * the core backend or in a library we just loaded.
1038+ */
1039+ entrypointstate = shm_toc_lookup (toc , PARALLEL_KEY_ENTRYPOINT );
1040+ Assert (entrypointstate != NULL );
1041+ library_name = entrypointstate ;
1042+ function_name = entrypointstate + strlen (library_name ) + 1 ;
1043+
1044+ entrypt = LookupParallelWorkerFunction (library_name , function_name );
1045+
10431046 /* Restore database connection. */
10441047 BackgroundWorkerInitializeConnectionByOid (fps -> database_id ,
10451048 fps -> authenticated_user_id );
@@ -1102,11 +1105,8 @@ ParallelWorkerMain(Datum main_arg)
11021105
11031106 /*
11041107 * Time to do the real work: invoke the caller-supplied code.
1105- *
1106- * If you get a crash at this line, see the comments for
1107- * ParallelExtensionTrampoline.
11081108 */
1109- fps -> entrypoint (seg , toc );
1109+ entrypt (seg , toc );
11101110
11111111 /* Must exit parallel mode to pop active snapshot. */
11121112 ExitParallelMode ();
@@ -1121,33 +1121,6 @@ ParallelWorkerMain(Datum main_arg)
11211121 pq_putmessage ('X' , NULL , 0 );
11221122}
11231123
1124- /*
1125- * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
1126- * function living in a dynamically loaded module, because the module might
1127- * not be loaded in every process, or might be loaded but not at the same
1128- * address. To work around that problem, CreateParallelContextForExtension()
1129- * arranges to call this function rather than calling the extension-provided
1130- * function directly; and this function then looks up the real entrypoint and
1131- * calls it.
1132- */
1133- static void
1134- ParallelExtensionTrampoline (dsm_segment * seg , shm_toc * toc )
1135- {
1136- char * extensionstate ;
1137- char * library_name ;
1138- char * function_name ;
1139- parallel_worker_main_type entrypt ;
1140-
1141- extensionstate = shm_toc_lookup (toc , PARALLEL_KEY_EXTENSION_TRAMPOLINE );
1142- Assert (extensionstate != NULL );
1143- library_name = extensionstate ;
1144- function_name = extensionstate + strlen (library_name ) + 1 ;
1145-
1146- entrypt = (parallel_worker_main_type )
1147- load_external_function (library_name , function_name , true, NULL );
1148- entrypt (seg , toc );
1149- }
1150-
11511124/*
11521125 * Update shared memory with the ending location of the last WAL record we
11531126 * wrote, if it's greater than the value already stored there.
@@ -1163,3 +1136,47 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
11631136 fps -> last_xlog_end = last_xlog_end ;
11641137 SpinLockRelease (& fps -> mutex );
11651138}
1139+
1140+ /*
1141+ * Look up (and possibly load) a parallel worker entry point function.
1142+ *
1143+ * For functions contained in the core code, we use library name "postgres"
1144+ * and consult the InternalParallelWorkers array. External functions are
1145+ * looked up, and loaded if necessary, using load_external_function().
1146+ *
1147+ * The point of this is to pass function names as strings across process
1148+ * boundaries. We can't pass actual function addresses because of the
1149+ * possibility that the function has been loaded at a different address
1150+ * in a different process. This is obviously a hazard for functions in
1151+ * loadable libraries, but it can happen even for functions in the core code
1152+ * on platforms using EXEC_BACKEND (e.g., Windows).
1153+ *
1154+ * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1155+ * in favor of applying load_external_function() for core functions too;
1156+ * but that raises portability issues that are not worth addressing now.
1157+ */
1158+ static parallel_worker_main_type
1159+ LookupParallelWorkerFunction (const char * libraryname , const char * funcname )
1160+ {
1161+ /*
1162+ * If the function is to be loaded from postgres itself, search the
1163+ * InternalParallelWorkers array.
1164+ */
1165+ if (strcmp (libraryname , "postgres" ) == 0 )
1166+ {
1167+ int i ;
1168+
1169+ for (i = 0 ; i < lengthof (InternalParallelWorkers ); i ++ )
1170+ {
1171+ if (strcmp (InternalParallelWorkers [i ].fn_name , funcname ) == 0 )
1172+ return InternalParallelWorkers [i ].fn_addr ;
1173+ }
1174+
1175+ /* We can only reach this by programming error. */
1176+ elog (ERROR , "internal function \"%s\" not found" , funcname );
1177+ }
1178+
1179+ /* Otherwise load from external library. */
1180+ return (parallel_worker_main_type )
1181+ load_external_function (libraryname , funcname , true, NULL );
1182+ }
0 commit comments