1919#include "access/xlog.h"
2020#include "catalog/namespace.h"
2121#include "commands/async.h"
22+ #include "executor/execParallel.h"
2223#include "libpq/libpq.h"
2324#include "libpq/pqformat.h"
2425#include "libpq/pqmq.h"
6061#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
6162#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
6263#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
63- #define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
64+ #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
6465
6566/* Fixed-size parallel state. */
6667typedef struct FixedParallelState
@@ -76,7 +77,7 @@ typedef struct FixedParallelState
7677 pid_t parallel_master_pid ;
7778 BackendId parallel_master_backend_id ;
7879
79- /* Entrypoint for parallel workers. */
80+ /* Entrypoint for parallel workers (deprecated)! */
8081 parallel_worker_main_type entrypoint ;
8182
8283 /* Mutex protects remaining fields. */
@@ -106,16 +107,36 @@ static FixedParallelState *MyFixedParallelState;
106107/* List of active parallel contexts. */
107108static dlist_head pcxt_list = DLIST_STATIC_INIT (pcxt_list );
108109
110+ /*
111+ * List of internal parallel worker entry points. We need this for
112+ * reasons explained in LookupParallelWorkerFunction(), below.
113+ */
114+ static const struct
115+ {
116+ const char * fn_name ;
117+ parallel_worker_main_type fn_addr ;
118+ } InternalParallelWorkers [] =
119+
120+ {
121+ {
122+ "ParallelQueryMain" , ParallelQueryMain
123+ }
124+ };
125+
109126/* Private functions. */
110127static void HandleParallelMessage (ParallelContext * pcxt , int i , StringInfo msg );
111- static void ParallelExtensionTrampoline (dsm_segment * seg , shm_toc * toc );
112128static void WaitForParallelWorkersToExit (ParallelContext * pcxt );
129+ static parallel_worker_main_type LookupParallelWorkerFunction (char * libraryname , char * funcname );
113130
114131
115132/*
116133 * Establish a new parallel context. This should be done after entering
117134 * parallel mode, and (unless there is an error) the context should be
118135 * destroyed before exiting the current subtransaction.
136+ *
137+ * NB: specifying the entrypoint as a function address is unportable.
138+ * This will go away in Postgres 10, in favor of the API provided by
139+ * CreateParallelContextForExternalFunction; in the meantime use that.
119140 */
120141ParallelContext *
121142CreateParallelContext (parallel_worker_main_type entrypoint , int nworkers )
@@ -163,9 +184,9 @@ CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
163184}
164185
165186/*
166- * Establish a new parallel context that calls a function provided by an
167- * extension. This works around the fact that the library might get mapped
168- * at a different address in each backend .
187+ * Establish a new parallel context that calls a function specified by name.
188+ * Unlike CreateParallelContext, this is robust against possible differences
189+ * in address space layout between different processes .
169190 */
170191ParallelContext *
171192CreateParallelContextForExternalFunction (char * library_name ,
@@ -179,7 +200,7 @@ CreateParallelContextForExternalFunction(char *library_name,
179200 oldcontext = MemoryContextSwitchTo (TopTransactionContext );
180201
181202 /* Create the context. */
182- pcxt = CreateParallelContext (ParallelExtensionTrampoline , nworkers );
203+ pcxt = CreateParallelContext (NULL , nworkers );
183204 pcxt -> library_name = pstrdup (library_name );
184205 pcxt -> function_name = pstrdup (function_name );
185206
@@ -248,10 +269,9 @@ InitializeParallelDSM(ParallelContext *pcxt)
248269 pcxt -> nworkers ));
249270 shm_toc_estimate_keys (& pcxt -> estimator , 1 );
250271
251- /* Estimate how much we'll need for extension entrypoint info. */
272+ /* Estimate how much we'll need for entrypoint info. */
252273 if (pcxt -> library_name != NULL )
253274 {
254- Assert (pcxt -> entrypoint == ParallelExtensionTrampoline );
255275 Assert (pcxt -> function_name != NULL );
256276 shm_toc_estimate_chunk (& pcxt -> estimator , strlen (pcxt -> library_name )
257277 + strlen (pcxt -> function_name ) + 2 );
@@ -367,7 +387,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
367387 }
368388 shm_toc_insert (pcxt -> toc , PARALLEL_KEY_ERROR_QUEUE , error_queue_space );
369389
370- /* Serialize extension entrypoint information. */
390+ /* Serialize entrypoint information. */
371391 if (pcxt -> library_name != NULL )
372392 {
373393 Size lnamelen = strlen (pcxt -> library_name );
@@ -377,7 +397,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
377397 + strlen (pcxt -> function_name ) + 2 );
378398 strcpy (extensionstate , pcxt -> library_name );
379399 strcpy (extensionstate + lnamelen + 1 , pcxt -> function_name );
380- shm_toc_insert (pcxt -> toc , PARALLEL_KEY_EXTENSION_TRAMPOLINE ,
400+ shm_toc_insert (pcxt -> toc , PARALLEL_KEY_ENTRYPOINT ,
381401 extensionstate );
382402 }
383403 }
@@ -669,6 +689,10 @@ DestroyParallelContext(ParallelContext *pcxt)
669689 }
670690
671691 /* Free memory. */
692+ if (pcxt -> library_name )
693+ pfree (pcxt -> library_name );
694+ if (pcxt -> function_name )
695+ pfree (pcxt -> function_name );
672696 pfree (pcxt );
673697}
674698
@@ -939,6 +963,8 @@ ParallelWorkerMain(Datum main_arg)
939963 shm_mq * mq ;
940964 shm_mq_handle * mqh ;
941965 char * libraryspace ;
966+ char * entrypointstate ;
967+ parallel_worker_main_type entrypt ;
942968 char * gucspace ;
943969 char * combocidspace ;
944970 char * tsnapspace ;
@@ -1038,6 +1064,25 @@ ParallelWorkerMain(Datum main_arg)
10381064 Assert (libraryspace != NULL );
10391065 RestoreLibraryState (libraryspace );
10401066
1067+ /*
1068+ * Identify the entry point to be called. In theory this could result in
1069+ * loading an additional library, though most likely the entry point is in
1070+ * the core backend or in a library we just loaded.
1071+ */
1072+ entrypointstate = shm_toc_lookup (toc , PARALLEL_KEY_ENTRYPOINT );
1073+ if (entrypointstate != NULL )
1074+ {
1075+ char * library_name ;
1076+ char * function_name ;
1077+
1078+ library_name = entrypointstate ;
1079+ function_name = entrypointstate + strlen (library_name ) + 1 ;
1080+
1081+ entrypt = LookupParallelWorkerFunction (library_name , function_name );
1082+ }
1083+ else
1084+ entrypt = fps -> entrypoint ;
1085+
10411086 /* Restore database connection. */
10421087 BackgroundWorkerInitializeConnectionByOid (fps -> database_id ,
10431088 fps -> authenticated_user_id );
@@ -1101,10 +1146,11 @@ ParallelWorkerMain(Datum main_arg)
11011146 /*
11021147 * Time to do the real work: invoke the caller-supplied code.
11031148 *
1104- * If you get a crash at this line, see the comments for
1105- * ParallelExtensionTrampoline.
1149+ * If you get a crash at this line, try using
1150+ * CreateParallelContextForExternalFunction instead of
1151+ * CreateParallelContext.
11061152 */
1107- fps -> entrypoint (seg , toc );
1153+ entrypt (seg , toc );
11081154
11091155 /* Must exit parallel mode to pop active snapshot. */
11101156 ExitParallelMode ();
@@ -1119,33 +1165,6 @@ ParallelWorkerMain(Datum main_arg)
11191165 pq_putmessage ('X' , NULL , 0 );
11201166}
11211167
1122- /*
1123- * It's unsafe for the entrypoint invoked by ParallelWorkerMain to be a
1124- * function living in a dynamically loaded module, because the module might
1125- * not be loaded in every process, or might be loaded but not at the same
1126- * address. To work around that problem, CreateParallelContextForExtension()
1127- * arranges to call this function rather than calling the extension-provided
1128- * function directly; and this function then looks up the real entrypoint and
1129- * calls it.
1130- */
1131- static void
1132- ParallelExtensionTrampoline (dsm_segment * seg , shm_toc * toc )
1133- {
1134- char * extensionstate ;
1135- char * library_name ;
1136- char * function_name ;
1137- parallel_worker_main_type entrypt ;
1138-
1139- extensionstate = shm_toc_lookup (toc , PARALLEL_KEY_EXTENSION_TRAMPOLINE );
1140- Assert (extensionstate != NULL );
1141- library_name = extensionstate ;
1142- function_name = extensionstate + strlen (library_name ) + 1 ;
1143-
1144- entrypt = (parallel_worker_main_type )
1145- load_external_function (library_name , function_name , true, NULL );
1146- entrypt (seg , toc );
1147- }
1148-
11491168/*
11501169 * Update shared memory with the ending location of the last WAL record we
11511170 * wrote, if it's greater than the value already stored there.
@@ -1161,3 +1180,47 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
11611180 fps -> last_xlog_end = last_xlog_end ;
11621181 SpinLockRelease (& fps -> mutex );
11631182}
1183+
1184+ /*
1185+ * Look up (and possibly load) a parallel worker entry point function.
1186+ *
1187+ * For functions contained in the core code, we use library name "postgres"
1188+ * and consult the InternalParallelWorkers array. External functions are
1189+ * looked up, and loaded if necessary, using load_external_function().
1190+ *
1191+ * The point of this is to pass function names as strings across process
1192+ * boundaries. We can't pass actual function addresses because of the
1193+ * possibility that the function has been loaded at a different address
1194+ * in a different process. This is obviously a hazard for functions in
1195+ * loadable libraries, but it can happen even for functions in the core code
1196+ * on platforms using EXEC_BACKEND (e.g., Windows).
1197+ *
1198+ * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1199+ * in favor of applying load_external_function() for core functions too;
1200+ * but that raises portability issues that are not worth addressing now.
1201+ */
1202+ static parallel_worker_main_type
1203+ LookupParallelWorkerFunction (char * libraryname , char * funcname )
1204+ {
1205+ /*
1206+ * If the function is to be loaded from postgres itself, search the
1207+ * InternalParallelWorkers array.
1208+ */
1209+ if (strcmp (libraryname , "postgres" ) == 0 )
1210+ {
1211+ int i ;
1212+
1213+ for (i = 0 ; i < lengthof (InternalParallelWorkers ); i ++ )
1214+ {
1215+ if (strcmp (InternalParallelWorkers [i ].fn_name , funcname ) == 0 )
1216+ return InternalParallelWorkers [i ].fn_addr ;
1217+ }
1218+
1219+ /* We can only reach this by programming error. */
1220+ elog (ERROR , "internal function \"%s\" not found" , funcname );
1221+ }
1222+
1223+ /* Otherwise load from external library. */
1224+ return (parallel_worker_main_type )
1225+ load_external_function (libraryname , funcname , true, NULL );
1226+ }
0 commit comments