@@ -34,11 +34,24 @@ typedef struct {
3434 char log_file [MAXPGPATH ];
3535 char opt_log_file [MAXPGPATH ];
3636 char cmd [MAX_STRING ];
37- } thread_arg ;
37+ } exec_thread_arg ;
3838
39- thread_arg * * thread_args ;
39+ typedef struct {
40+ DbInfoArr * old_db_arr ;
41+ DbInfoArr * new_db_arr ;
42+ char old_pgdata [MAXPGPATH ];
43+ char new_pgdata [MAXPGPATH ];
44+ char old_tablespace [MAXPGPATH ];
45+ } transfer_thread_arg ;
46+
47+ exec_thread_arg * * exec_thread_args ;
48+ transfer_thread_arg * * transfer_thread_args ;
49+
50+ /* track current thread_args struct so reap_child() can be used for all cases */
51+ void * * cur_thread_args ;
4052
41- DWORD win32_exec_prog (thread_arg * args );
53+ DWORD win32_exec_prog (exec_thread_arg * args );
54+ DWORD win32_transfer_all_new_dbs (transfer_thread_arg * args );
4255
4356#endif
4457
@@ -58,7 +71,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
5871 pid_t child ;
5972#else
6073 HANDLE child ;
61- thread_arg * new_arg ;
74+ exec_thread_arg * new_arg ;
6275#endif
6376
6477 va_start (args , fmt );
@@ -71,7 +84,9 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
7184 else
7285 {
7386 /* parallel */
74-
87+ #ifdef WIN32
88+ cur_thread_args = (void * * )exec_thread_args ;
89+ #endif
7590 /* harvest any dead children */
7691 while (reap_child (false) == true)
7792 ;
@@ -100,19 +115,19 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
100115 int i ;
101116
102117 thread_handles = pg_malloc (user_opts .jobs * sizeof (HANDLE ));
103- thread_args = pg_malloc (user_opts .jobs * sizeof (thread_arg * ));
118+ exec_thread_args = pg_malloc (user_opts .jobs * sizeof (exec_thread_arg * ));
104119
105120 /*
106121 * For safety and performance, we keep the args allocated during
107122 * the entire life of the process, and we don't free the args
108123 * in a thread different from the one that allocated it.
109124 */
110125 for (i = 0 ; i < user_opts .jobs ; i ++ )
111- thread_args [i ] = pg_malloc (sizeof (thread_arg ));
126+ exec_thread_args [i ] = pg_malloc (sizeof (exec_thread_arg ));
112127 }
113128
114129 /* use first empty array element */
115- new_arg = thread_args [parallel_jobs - 1 ];
130+ new_arg = exec_thread_args [parallel_jobs - 1 ];
116131
117132 /* Can only pass one pointer into the function, so use a struct */
118133 strcpy (new_arg -> log_file , log_file );
@@ -134,7 +149,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file,
134149
135150#ifdef WIN32
136151DWORD
137- win32_exec_prog (thread_arg * args )
152+ win32_exec_prog (exec_thread_arg * args )
138153{
139154 int ret ;
140155
@@ -146,6 +161,112 @@ win32_exec_prog(thread_arg *args)
146161#endif
147162
148163
164+ /*
165+ * parallel_transfer_all_new_dbs
166+ *
167+ * This has the same API as transfer_all_new_dbs, except it does parallel execution
168+ * by transfering multiple tablespaces in parallel
169+ */
170+ void parallel_transfer_all_new_dbs (DbInfoArr * old_db_arr , DbInfoArr * new_db_arr ,
171+ char * old_pgdata , char * new_pgdata ,
172+ char * old_tablespace )
173+ {
174+ #ifndef WIN32
175+ pid_t child ;
176+ #else
177+ HANDLE child ;
178+ transfer_thread_arg * new_arg ;
179+ #endif
180+
181+ if (user_opts .jobs <= 1 )
182+ /* throw_error must be true to allow jobs */
183+ transfer_all_new_dbs (old_db_arr , new_db_arr , old_pgdata , new_pgdata , NULL );
184+ else
185+ {
186+ /* parallel */
187+ #ifdef WIN32
188+ cur_thread_args = (void * * )transfer_thread_args ;
189+ #endif
190+ /* harvest any dead children */
191+ while (reap_child (false) == true)
192+ ;
193+
194+ /* must we wait for a dead child? */
195+ if (parallel_jobs >= user_opts .jobs )
196+ reap_child (true);
197+
198+ /* set this before we start the job */
199+ parallel_jobs ++ ;
200+
201+ /* Ensure stdio state is quiesced before forking */
202+ fflush (NULL );
203+
204+ #ifndef WIN32
205+ child = fork ();
206+ if (child == 0 )
207+ {
208+ transfer_all_new_dbs (old_db_arr , new_db_arr , old_pgdata , new_pgdata ,
209+ old_tablespace );
210+ /* if we take another exit path, it will be non-zero */
211+ /* use _exit to skip atexit() functions */
212+ _exit (0 );
213+ }
214+ else if (child < 0 )
215+ /* fork failed */
216+ pg_log (PG_FATAL , "could not create worker process: %s\n" , strerror (errno ));
217+ #else
218+ if (thread_handles == NULL )
219+ {
220+ int i ;
221+
222+ thread_handles = pg_malloc (user_opts .jobs * sizeof (HANDLE ));
223+ transfer_thread_args = pg_malloc (user_opts .jobs * sizeof (transfer_thread_arg * ));
224+
225+ /*
226+ * For safety and performance, we keep the args allocated during
227+ * the entire life of the process, and we don't free the args
228+ * in a thread different from the one that allocated it.
229+ */
230+ for (i = 0 ; i < user_opts .jobs ; i ++ )
231+ transfer_thread_args [i ] = pg_malloc (sizeof (transfer_thread_arg ));
232+ }
233+
234+ /* use first empty array element */
235+ new_arg = transfer_thread_args [parallel_jobs - 1 ];
236+
237+ /* Can only pass one pointer into the function, so use a struct */
238+ new_arg -> old_db_arr = old_db_arr ;
239+ new_arg -> new_db_arr = new_db_arr ;
240+ strcpy (new_arg -> old_pgdata , old_pgdata );
241+ strcpy (new_arg -> new_pgdata , new_pgdata );
242+ strcpy (new_arg -> old_tablespace , old_tablespace );
243+
244+ child = (HANDLE ) _beginthreadex (NULL , 0 , (void * ) win32_exec_prog ,
245+ new_arg , 0 , NULL );
246+ if (child == 0 )
247+ pg_log (PG_FATAL , "could not create worker thread: %s\n" , strerror (errno ));
248+
249+ thread_handles [parallel_jobs - 1 ] = child ;
250+ #endif
251+ }
252+
253+ return ;
254+ }
255+
256+
257+ #ifdef WIN32
258+ DWORD
259+ win32_transfer_all_new_dbs (transfer_thread_arg * args )
260+ {
261+ transfer_all_new_dbs (args -> old_db_arr , args -> new_db_arr , args -> old_pgdata ,
262+ args -> new_pgdata , args -> old_tablespace );
263+
264+ /* terminates thread */
265+ return 0 ;
266+ }
267+ #endif
268+
269+
149270/*
150271 * collect status from a completed worker child
151272 */
@@ -195,7 +316,7 @@ reap_child(bool wait_for_child)
195316 /* Move last slot into dead child's position */
196317 if (thread_num != parallel_jobs - 1 )
197318 {
198- thread_arg * tmp_args ;
319+ void * tmp_args ;
199320
200321 thread_handles [thread_num ] = thread_handles [parallel_jobs - 1 ];
201322
@@ -205,9 +326,9 @@ reap_child(bool wait_for_child)
205326 * reused by the next created thread. Instead, the new thread
206327 * will use the arg struct of the thread that just died.
207328 */
208- tmp_args = thread_args [thread_num ];
209- thread_args [thread_num ] = thread_args [parallel_jobs - 1 ];
210- thread_args [parallel_jobs - 1 ] = tmp_args ;
329+ tmp_args = cur_thread_args [thread_num ];
330+ cur_thread_args [thread_num ] = cur_thread_args [parallel_jobs - 1 ];
331+ cur_thread_args [parallel_jobs - 1 ] = tmp_args ;
211332 }
212333#endif
213334
0 commit comments