1/*
2 * parallel.c
3 *
4 * multi-process support
5 *
6 * Copyright (c) 2010-2019, PostgreSQL Global Development Group
7 * src/bin/pg_upgrade/parallel.c
8 */
9
10#include "postgres_fe.h"
11
12#include <sys/wait.h>
13#ifdef WIN32
14#include <io.h>
15#endif
16
17#include "pg_upgrade.h"
18
19
20static int parallel_jobs;
21
22#ifdef WIN32
23/*
24 * Array holding all active threads. There can't be any gaps/zeros so
25 * it can be passed to WaitForMultipleObjects(). We use two arrays
26 * so the thread_handles array can be passed to WaitForMultipleObjects().
27 */
28HANDLE *thread_handles;
29
30typedef struct
31{
32 char *log_file;
33 char *opt_log_file;
34 char *cmd;
35} exec_thread_arg;
36
37typedef struct
38{
39 DbInfoArr *old_db_arr;
40 DbInfoArr *new_db_arr;
41 char *old_pgdata;
42 char *new_pgdata;
43 char *old_tablespace;
44} transfer_thread_arg;
45
46exec_thread_arg **exec_thread_args;
47transfer_thread_arg **transfer_thread_args;
48
49/* track current thread_args struct so reap_child() can be used for all cases */
50void **cur_thread_args;
51
52DWORD win32_exec_prog(exec_thread_arg *args);
53DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
54#endif
55
56/*
57 * parallel_exec_prog
58 *
59 * This has the same API as exec_prog, except it does parallel execution,
60 * and therefore must throw errors and doesn't return an error status.
61 */
62void
63parallel_exec_prog(const char *log_file, const char *opt_log_file,
64 const char *fmt,...)
65{
66 va_list args;
67 char cmd[MAX_STRING];
68
69#ifndef WIN32
70 pid_t child;
71#else
72 HANDLE child;
73 exec_thread_arg *new_arg;
74#endif
75
76 va_start(args, fmt);
77 vsnprintf(cmd, sizeof(cmd), fmt, args);
78 va_end(args);
79
80 if (user_opts.jobs <= 1)
81 /* exit_on_error must be true to allow jobs */
82 exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
83 else
84 {
85 /* parallel */
86#ifdef WIN32
87 if (thread_handles == NULL)
88 thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
89
90 if (exec_thread_args == NULL)
91 {
92 int i;
93
94 exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
95
96 /*
97 * For safety and performance, we keep the args allocated during
98 * the entire life of the process, and we don't free the args in a
99 * thread different from the one that allocated it.
100 */
101 for (i = 0; i < user_opts.jobs; i++)
102 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
103 }
104
105 cur_thread_args = (void **) exec_thread_args;
106#endif
107 /* harvest any dead children */
108 while (reap_child(false) == true)
109 ;
110
111 /* must we wait for a dead child? */
112 if (parallel_jobs >= user_opts.jobs)
113 reap_child(true);
114
115 /* set this before we start the job */
116 parallel_jobs++;
117
118 /* Ensure stdio state is quiesced before forking */
119 fflush(NULL);
120
121#ifndef WIN32
122 child = fork();
123 if (child == 0)
124 /* use _exit to skip atexit() functions */
125 _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
126 else if (child < 0)
127 /* fork failed */
128 pg_fatal("could not create worker process: %s\n", strerror(errno));
129#else
130 /* empty array element are always at the end */
131 new_arg = exec_thread_args[parallel_jobs - 1];
132
133 /* Can only pass one pointer into the function, so use a struct */
134 if (new_arg->log_file)
135 pg_free(new_arg->log_file);
136 new_arg->log_file = pg_strdup(log_file);
137 if (new_arg->opt_log_file)
138 pg_free(new_arg->opt_log_file);
139 new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
140 if (new_arg->cmd)
141 pg_free(new_arg->cmd);
142 new_arg->cmd = pg_strdup(cmd);
143
144 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
145 new_arg, 0, NULL);
146 if (child == 0)
147 pg_fatal("could not create worker thread: %s\n", strerror(errno));
148
149 thread_handles[parallel_jobs - 1] = child;
150#endif
151 }
152
153 return;
154}
155
156
157#ifdef WIN32
158DWORD
159win32_exec_prog(exec_thread_arg *args)
160{
161 int ret;
162
163 ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
164
165 /* terminates thread */
166 return ret;
167}
168#endif
169
170
171/*
172 * parallel_transfer_all_new_dbs
173 *
174 * This has the same API as transfer_all_new_dbs, except it does parallel execution
175 * by transferring multiple tablespaces in parallel
176 */
177void
178parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
179 char *old_pgdata, char *new_pgdata,
180 char *old_tablespace)
181{
182#ifndef WIN32
183 pid_t child;
184#else
185 HANDLE child;
186 transfer_thread_arg *new_arg;
187#endif
188
189 if (user_opts.jobs <= 1)
190 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
191 else
192 {
193 /* parallel */
194#ifdef WIN32
195 if (thread_handles == NULL)
196 thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
197
198 if (transfer_thread_args == NULL)
199 {
200 int i;
201
202 transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
203
204 /*
205 * For safety and performance, we keep the args allocated during
206 * the entire life of the process, and we don't free the args in a
207 * thread different from the one that allocated it.
208 */
209 for (i = 0; i < user_opts.jobs; i++)
210 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
211 }
212
213 cur_thread_args = (void **) transfer_thread_args;
214#endif
215 /* harvest any dead children */
216 while (reap_child(false) == true)
217 ;
218
219 /* must we wait for a dead child? */
220 if (parallel_jobs >= user_opts.jobs)
221 reap_child(true);
222
223 /* set this before we start the job */
224 parallel_jobs++;
225
226 /* Ensure stdio state is quiesced before forking */
227 fflush(NULL);
228
229#ifndef WIN32
230 child = fork();
231 if (child == 0)
232 {
233 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
234 old_tablespace);
235 /* if we take another exit path, it will be non-zero */
236 /* use _exit to skip atexit() functions */
237 _exit(0);
238 }
239 else if (child < 0)
240 /* fork failed */
241 pg_fatal("could not create worker process: %s\n", strerror(errno));
242#else
243 /* empty array element are always at the end */
244 new_arg = transfer_thread_args[parallel_jobs - 1];
245
246 /* Can only pass one pointer into the function, so use a struct */
247 new_arg->old_db_arr = old_db_arr;
248 new_arg->new_db_arr = new_db_arr;
249 if (new_arg->old_pgdata)
250 pg_free(new_arg->old_pgdata);
251 new_arg->old_pgdata = pg_strdup(old_pgdata);
252 if (new_arg->new_pgdata)
253 pg_free(new_arg->new_pgdata);
254 new_arg->new_pgdata = pg_strdup(new_pgdata);
255 if (new_arg->old_tablespace)
256 pg_free(new_arg->old_tablespace);
257 new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
258
259 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
260 new_arg, 0, NULL);
261 if (child == 0)
262 pg_fatal("could not create worker thread: %s\n", strerror(errno));
263
264 thread_handles[parallel_jobs - 1] = child;
265#endif
266 }
267
268 return;
269}
270
271
272#ifdef WIN32
273DWORD
274win32_transfer_all_new_dbs(transfer_thread_arg *args)
275{
276 transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
277 args->new_pgdata, args->old_tablespace);
278
279 /* terminates thread */
280 return 0;
281}
282#endif
283
284
285/*
286 * collect status from a completed worker child
287 */
288bool
289reap_child(bool wait_for_child)
290{
291#ifndef WIN32
292 int work_status;
293 pid_t child;
294#else
295 int thread_num;
296 DWORD res;
297#endif
298
299 if (user_opts.jobs <= 1 || parallel_jobs == 0)
300 return false;
301
302#ifndef WIN32
303 child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
304 if (child == (pid_t) -1)
305 pg_fatal("waitpid() failed: %s\n", strerror(errno));
306 if (child == 0)
307 return false; /* no children, or no dead children */
308 if (work_status != 0)
309 pg_fatal("child process exited abnormally: status %d\n", work_status);
310#else
311 /* wait for one to finish */
312 thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
313 false, wait_for_child ? INFINITE : 0);
314
315 if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
316 return false;
317
318 /* compute thread index in active_threads */
319 thread_num -= WAIT_OBJECT_0;
320
321 /* get the result */
322 GetExitCodeThread(thread_handles[thread_num], &res);
323 if (res != 0)
324 pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
325
326 /* dispose of handle to stop leaks */
327 CloseHandle(thread_handles[thread_num]);
328
329 /* Move last slot into dead child's position */
330 if (thread_num != parallel_jobs - 1)
331 {
332 void *tmp_args;
333
334 thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
335
336 /*
337 * Move last active thead arg struct into the now-dead slot, and the
338 * now-dead slot to the end for reuse by the next thread. Though the
339 * thread struct is in use by another thread, we can safely swap the
340 * struct pointers within the array.
341 */
342 tmp_args = cur_thread_args[thread_num];
343 cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
344 cur_thread_args[parallel_jobs - 1] = tmp_args;
345 }
346#endif
347
348 /* do this after job has been removed */
349 parallel_jobs--;
350
351 return true;
352}
353