1/*-------------------------------------------------------------------------
2 *
3 * parallel.c
4 *
5 * Parallel support for pg_dump and pg_restore
6 *
7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/bin/pg_dump/parallel.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16/*
17 * Parallel operation works like this:
18 *
19 * The original, master process calls ParallelBackupStart(), which forks off
20 * the desired number of worker processes, which each enter WaitForCommands().
21 *
22 * The master process dispatches an individual work item to one of the worker
23 * processes in DispatchJobForTocEntry(). We send a command string such as
24 * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 * The worker process receives and decodes the command and passes it to the
26 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 * which are routines of the current archive format. That routine performs
28 * the required action (dump or restore) and returns an integer status code.
29 * This is passed back to the master where we pass it to the
30 * ParallelCompletionPtr callback function that was passed to
31 * DispatchJobForTocEntry(). The callback function does state updating
32 * for the master control logic in pg_backup_archiver.c.
33 *
34 * In principle additional archive-format-specific information might be needed
35 * in commands or worker status responses, but so far that hasn't proved
36 * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 * data structures. Remember that we have forked off the workers only after
38 * we have read in the catalog. That's why our worker processes can also
39 * access the catalog information. (In the Windows case, the workers are
40 * threads in the same process. To avoid problems, they work with cloned
41 * copies of the Archive data structure; see RunWorker().)
42 *
43 * In the master process, the workerStatus field for each worker has one of
44 * the following values:
45 * WRKR_IDLE: it's waiting for a command
46 * WRKR_WORKING: it's working on a command
47 * WRKR_TERMINATED: process ended
48 * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
49 * state, and must be NULL in other states.
50 */
51
52#include "postgres_fe.h"
53
54#ifndef WIN32
55#include <sys/wait.h>
56#include <signal.h>
57#include <unistd.h>
58#include <fcntl.h>
59#endif
60#ifdef HAVE_SYS_SELECT_H
61#include <sys/select.h>
62#endif
63
64#include "parallel.h"
65#include "pg_backup_utils.h"
66
67#include "fe_utils/string_utils.h"
68#include "port/pg_bswap.h"
69
70/* Mnemonic macros for indexing the fd array returned by pipe(2) */
71#define PIPE_READ 0
72#define PIPE_WRITE 1
73
74#define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
75
76/* Worker process statuses */
77typedef enum
78{
79 WRKR_IDLE,
80 WRKR_WORKING,
81 WRKR_TERMINATED
82} T_WorkerStatus;
83
84/*
85 * Private per-parallel-worker state (typedef for this is in parallel.h).
86 *
87 * Much of this is valid only in the master process (or, on Windows, should
88 * be touched only by the master thread). But the AH field should be touched
89 * only by workers. The pipe descriptors are valid everywhere.
90 */
91struct ParallelSlot
92{
93 T_WorkerStatus workerStatus; /* see enum above */
94
95 /* These fields are valid if workerStatus == WRKR_WORKING: */
96 ParallelCompletionPtr callback; /* function to call on completion */
97 void *callback_data; /* passthrough data for it */
98
99 ArchiveHandle *AH; /* Archive data worker is using */
100
101 int pipeRead; /* master's end of the pipes */
102 int pipeWrite;
103 int pipeRevRead; /* child's end of the pipes */
104 int pipeRevWrite;
105
106 /* Child process/thread identity info: */
107#ifdef WIN32
108 uintptr_t hThread;
109 unsigned int threadId;
110#else
111 pid_t pid;
112#endif
113};
114
115#ifdef WIN32
116
117/*
118 * Structure to hold info passed by _beginthreadex() to the function it calls
119 * via its single allowed argument.
120 */
121typedef struct
122{
123 ArchiveHandle *AH; /* master database connection */
124 ParallelSlot *slot; /* this worker's parallel slot */
125} WorkerInfo;
126
127/* Windows implementation of pipe access */
128static int pgpipe(int handles[2]);
129static int piperead(int s, char *buf, int len);
130#define pipewrite(a,b,c) send(a,b,c,0)
131
132#else /* !WIN32 */
133
134/* Non-Windows implementation of pipe access */
135#define pgpipe(a) pipe(a)
136#define piperead(a,b,c) read(a,b,c)
137#define pipewrite(a,b,c) write(a,b,c)
138
139#endif /* WIN32 */
140
141/*
142 * State info for archive_close_connection() shutdown callback.
143 */
144typedef struct ShutdownInformation
145{
146 ParallelState *pstate;
147 Archive *AHX;
148} ShutdownInformation;
149
150static ShutdownInformation shutdown_info;
151
152/*
153 * State info for signal handling.
154 * We assume signal_info initializes to zeroes.
155 *
156 * On Unix, myAH is the master DB connection in the master process, and the
157 * worker's own connection in worker processes. On Windows, we have only one
158 * instance of signal_info, so myAH is the master connection and the worker
159 * connections must be dug out of pstate->parallelSlot[].
160 */
161typedef struct DumpSignalInformation
162{
163 ArchiveHandle *myAH; /* database connection to issue cancel for */
164 ParallelState *pstate; /* parallel state, if any */
165 bool handler_set; /* signal handler set up in this process? */
166#ifndef WIN32
167 bool am_worker; /* am I a worker process? */
168#endif
169} DumpSignalInformation;
170
171static volatile DumpSignalInformation signal_info;
172
173#ifdef WIN32
174static CRITICAL_SECTION signal_info_lock;
175#endif
176
177/*
178 * Write a simple string to stderr --- must be safe in a signal handler.
179 * We ignore the write() result since there's not much we could do about it.
180 * Certain compilers make that harder than it ought to be.
181 */
182#define write_stderr(str) \
183 do { \
184 const char *str_ = (str); \
185 int rc_; \
186 rc_ = write(fileno(stderr), str_, strlen(str_)); \
187 (void) rc_; \
188 } while (0)
189
190
191#ifdef WIN32
192/* file-scope variables */
193static DWORD tls_index;
194
195/* globally visible variables (needed by exit_nicely) */
196bool parallel_init_done = false;
197DWORD mainThreadId;
198#endif /* WIN32 */
199
200/* Local function prototypes */
201static ParallelSlot *GetMyPSlot(ParallelState *pstate);
202static void archive_close_connection(int code, void *arg);
203static void ShutdownWorkersHard(ParallelState *pstate);
204static void WaitForTerminatingWorkers(ParallelState *pstate);
205static void setup_cancel_handler(void);
206static void set_cancel_pstate(ParallelState *pstate);
207static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
208static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
209static int GetIdleWorker(ParallelState *pstate);
210static bool HasEveryWorkerTerminated(ParallelState *pstate);
211static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
212static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
213static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
214 bool do_wait);
215static char *getMessageFromMaster(int pipefd[2]);
216static void sendMessageToMaster(int pipefd[2], const char *str);
217static int select_loop(int maxFd, fd_set *workerset);
218static char *getMessageFromWorker(ParallelState *pstate,
219 bool do_wait, int *worker);
220static void sendMessageToWorker(ParallelState *pstate,
221 int worker, const char *str);
222static char *readMessageFromPipe(int fd);
223
224#define messageStartsWith(msg, prefix) \
225 (strncmp(msg, prefix, strlen(prefix)) == 0)
226
227
228/*
229 * Shutdown callback to clean up socket access
230 */
231#ifdef WIN32
232static void
233shutdown_parallel_dump_utils(int code, void *unused)
234{
235 /* Call the cleanup function only from the main thread */
236 if (mainThreadId == GetCurrentThreadId())
237 WSACleanup();
238}
239#endif
240
241/*
242 * Initialize parallel dump support --- should be called early in process
243 * startup. (Currently, this is called whether or not we intend parallel
244 * activity.)
245 */
246void
247init_parallel_dump_utils(void)
248{
249#ifdef WIN32
250 if (!parallel_init_done)
251 {
252 WSADATA wsaData;
253 int err;
254
255 /* Prepare for threaded operation */
256 tls_index = TlsAlloc();
257 mainThreadId = GetCurrentThreadId();
258
259 /* Initialize socket access */
260 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
261 if (err != 0)
262 {
263 pg_log_error("WSAStartup failed: %d", err);
264 exit_nicely(1);
265 }
266 /* ... and arrange to shut it down at exit */
267 on_exit_nicely(shutdown_parallel_dump_utils, NULL);
268 parallel_init_done = true;
269 }
270#endif
271}
272
273/*
274 * Find the ParallelSlot for the current worker process or thread.
275 *
276 * Returns NULL if no matching slot is found (this implies we're the master).
277 */
278static ParallelSlot *
279GetMyPSlot(ParallelState *pstate)
280{
281 int i;
282
283 for (i = 0; i < pstate->numWorkers; i++)
284 {
285#ifdef WIN32
286 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
287#else
288 if (pstate->parallelSlot[i].pid == getpid())
289#endif
290 return &(pstate->parallelSlot[i]);
291 }
292
293 return NULL;
294}
295
296/*
297 * A thread-local version of getLocalPQExpBuffer().
298 *
299 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
300 * thread, which is much better than one per fmtId/fmtQualifiedId call.
301 */
302#ifdef WIN32
303static PQExpBuffer
304getThreadLocalPQExpBuffer(void)
305{
306 /*
307 * The Tls code goes awry if we use a static var, so we provide for both
308 * static and auto, and omit any use of the static var when using Tls. We
309 * rely on TlsGetValue() to return 0 if the value is not yet set.
310 */
311 static PQExpBuffer s_id_return = NULL;
312 PQExpBuffer id_return;
313
314 if (parallel_init_done)
315 id_return = (PQExpBuffer) TlsGetValue(tls_index);
316 else
317 id_return = s_id_return;
318
319 if (id_return) /* first time through? */
320 {
321 /* same buffer, just wipe contents */
322 resetPQExpBuffer(id_return);
323 }
324 else
325 {
326 /* new buffer */
327 id_return = createPQExpBuffer();
328 if (parallel_init_done)
329 TlsSetValue(tls_index, id_return);
330 else
331 s_id_return = id_return;
332 }
333
334 return id_return;
335}
336#endif /* WIN32 */
337
338/*
339 * pg_dump and pg_restore call this to register the cleanup handler
340 * as soon as they've created the ArchiveHandle.
341 */
342void
343on_exit_close_archive(Archive *AHX)
344{
345 shutdown_info.AHX = AHX;
346 on_exit_nicely(archive_close_connection, &shutdown_info);
347}
348
349/*
350 * on_exit_nicely handler for shutting down database connections and
351 * worker processes cleanly.
352 */
353static void
354archive_close_connection(int code, void *arg)
355{
356 ShutdownInformation *si = (ShutdownInformation *) arg;
357
358 if (si->pstate)
359 {
360 /* In parallel mode, must figure out who we are */
361 ParallelSlot *slot = GetMyPSlot(si->pstate);
362
363 if (!slot)
364 {
365 /*
366 * We're the master. Forcibly shut down workers, then close our
367 * own database connection, if any.
368 */
369 ShutdownWorkersHard(si->pstate);
370
371 if (si->AHX)
372 DisconnectDatabase(si->AHX);
373 }
374 else
375 {
376 /*
377 * We're a worker. Shut down our own DB connection if any. On
378 * Windows, we also have to close our communication sockets, to
379 * emulate what will happen on Unix when the worker process exits.
380 * (Without this, if this is a premature exit, the master would
381 * fail to detect it because there would be no EOF condition on
382 * the other end of the pipe.)
383 */
384 if (slot->AH)
385 DisconnectDatabase(&(slot->AH->public));
386
387#ifdef WIN32
388 closesocket(slot->pipeRevRead);
389 closesocket(slot->pipeRevWrite);
390#endif
391 }
392 }
393 else
394 {
395 /* Non-parallel operation: just kill the master DB connection */
396 if (si->AHX)
397 DisconnectDatabase(si->AHX);
398 }
399}
400
401/*
402 * Forcibly shut down any remaining workers, waiting for them to finish.
403 *
404 * Note that we don't expect to come here during normal exit (the workers
405 * should be long gone, and the ParallelState too). We're only here in a
406 * fatal() situation, so intervening to cancel active commands is
407 * appropriate.
408 */
409static void
410ShutdownWorkersHard(ParallelState *pstate)
411{
412 int i;
413
414 /*
415 * Close our write end of the sockets so that any workers waiting for
416 * commands know they can exit.
417 */
418 for (i = 0; i < pstate->numWorkers; i++)
419 closesocket(pstate->parallelSlot[i].pipeWrite);
420
421 /*
422 * Force early termination of any commands currently in progress.
423 */
424#ifndef WIN32
425 /* On non-Windows, send SIGTERM to each worker process. */
426 for (i = 0; i < pstate->numWorkers; i++)
427 {
428 pid_t pid = pstate->parallelSlot[i].pid;
429
430 if (pid != 0)
431 kill(pid, SIGTERM);
432 }
433#else
434
435 /*
436 * On Windows, send query cancels directly to the workers' backends. Use
437 * a critical section to ensure worker threads don't change state.
438 */
439 EnterCriticalSection(&signal_info_lock);
440 for (i = 0; i < pstate->numWorkers; i++)
441 {
442 ArchiveHandle *AH = pstate->parallelSlot[i].AH;
443 char errbuf[1];
444
445 if (AH != NULL && AH->connCancel != NULL)
446 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
447 }
448 LeaveCriticalSection(&signal_info_lock);
449#endif
450
451 /* Now wait for them to terminate. */
452 WaitForTerminatingWorkers(pstate);
453}
454
455/*
456 * Wait for all workers to terminate.
457 */
458static void
459WaitForTerminatingWorkers(ParallelState *pstate)
460{
461 while (!HasEveryWorkerTerminated(pstate))
462 {
463 ParallelSlot *slot = NULL;
464 int j;
465
466#ifndef WIN32
467 /* On non-Windows, use wait() to wait for next worker to end */
468 int status;
469 pid_t pid = wait(&status);
470
471 /* Find dead worker's slot, and clear the PID field */
472 for (j = 0; j < pstate->numWorkers; j++)
473 {
474 slot = &(pstate->parallelSlot[j]);
475 if (slot->pid == pid)
476 {
477 slot->pid = 0;
478 break;
479 }
480 }
481#else /* WIN32 */
482 /* On Windows, we must use WaitForMultipleObjects() */
483 HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
484 int nrun = 0;
485 DWORD ret;
486 uintptr_t hThread;
487
488 for (j = 0; j < pstate->numWorkers; j++)
489 {
490 if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
491 {
492 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
493 nrun++;
494 }
495 }
496 ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
497 Assert(ret != WAIT_FAILED);
498 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
499 free(lpHandles);
500
501 /* Find dead worker's slot, and clear the hThread field */
502 for (j = 0; j < pstate->numWorkers; j++)
503 {
504 slot = &(pstate->parallelSlot[j]);
505 if (slot->hThread == hThread)
506 {
507 /* For cleanliness, close handles for dead threads */
508 CloseHandle((HANDLE) slot->hThread);
509 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
510 break;
511 }
512 }
513#endif /* WIN32 */
514
515 /* On all platforms, update workerStatus and te[] as well */
516 Assert(j < pstate->numWorkers);
517 slot->workerStatus = WRKR_TERMINATED;
518 pstate->te[j] = NULL;
519 }
520}
521
522
523/*
524 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
525 *
526 * This doesn't quite belong in this module, but it needs access to the
527 * ParallelState data, so there's not really a better place either.
528 *
529 * When we get a cancel interrupt, we could just die, but in pg_restore that
530 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
531 * for a long time. Instead, we try to send a cancel request and then die.
532 * pg_dump probably doesn't really need this, but we might as well use it
533 * there too. Note that sending the cancel directly from the signal handler
534 * is safe because PQcancel() is written to make it so.
535 *
536 * In parallel operation on Unix, each process is responsible for canceling
537 * its own connection (this must be so because nobody else has access to it).
538 * Furthermore, the master process should attempt to forward its signal to
539 * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
540 * needed because typing control-C at the console would deliver SIGINT to
541 * every member of the terminal process group --- but in other scenarios it
542 * might be that only the master gets signaled.
543 *
544 * On Windows, the cancel handler runs in a separate thread, because that's
545 * how SetConsoleCtrlHandler works. We make it stop worker threads, send
546 * cancels on all active connections, and then return FALSE, which will allow
547 * the process to die. For safety's sake, we use a critical section to
548 * protect the PGcancel structures against being changed while the signal
549 * thread runs.
550 */
551
552#ifndef WIN32
553
554/*
555 * Signal handler (Unix only)
556 */
557static void
558sigTermHandler(SIGNAL_ARGS)
559{
560 int i;
561 char errbuf[1];
562
563 /*
564 * Some platforms allow delivery of new signals to interrupt an active
565 * signal handler. That could muck up our attempt to send PQcancel, so
566 * disable the signals that setup_cancel_handler enabled.
567 */
568 pqsignal(SIGINT, SIG_IGN);
569 pqsignal(SIGTERM, SIG_IGN);
570 pqsignal(SIGQUIT, SIG_IGN);
571
572 /*
573 * If we're in the master, forward signal to all workers. (It seems best
574 * to do this before PQcancel; killing the master transaction will result
575 * in invalid-snapshot errors from active workers, which maybe we can
576 * quiet by killing workers first.) Ignore any errors.
577 */
578 if (signal_info.pstate != NULL)
579 {
580 for (i = 0; i < signal_info.pstate->numWorkers; i++)
581 {
582 pid_t pid = signal_info.pstate->parallelSlot[i].pid;
583
584 if (pid != 0)
585 kill(pid, SIGTERM);
586 }
587 }
588
589 /*
590 * Send QueryCancel if we have a connection to send to. Ignore errors,
591 * there's not much we can do about them anyway.
592 */
593 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
594 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
595
596 /*
597 * Report we're quitting, using nothing more complicated than write(2).
598 * When in parallel operation, only the master process should do this.
599 */
600 if (!signal_info.am_worker)
601 {
602 if (progname)
603 {
604 write_stderr(progname);
605 write_stderr(": ");
606 }
607 write_stderr("terminated by user\n");
608 }
609
610 /* And die. */
611 exit(1);
612}
613
614/*
615 * Enable cancel interrupt handler, if not already done.
616 */
617static void
618setup_cancel_handler(void)
619{
620 /*
621 * When forking, signal_info.handler_set will propagate into the new
622 * process, but that's fine because the signal handler state does too.
623 */
624 if (!signal_info.handler_set)
625 {
626 signal_info.handler_set = true;
627
628 pqsignal(SIGINT, sigTermHandler);
629 pqsignal(SIGTERM, sigTermHandler);
630 pqsignal(SIGQUIT, sigTermHandler);
631 }
632}
633
634#else /* WIN32 */
635
636/*
637 * Console interrupt handler --- runs in a newly-started thread.
638 *
639 * After stopping other threads and sending cancel requests on all open
640 * connections, we return FALSE which will allow the default ExitProcess()
641 * action to be taken.
642 */
643static BOOL WINAPI
644consoleHandler(DWORD dwCtrlType)
645{
646 int i;
647 char errbuf[1];
648
649 if (dwCtrlType == CTRL_C_EVENT ||
650 dwCtrlType == CTRL_BREAK_EVENT)
651 {
652 /* Critical section prevents changing data we look at here */
653 EnterCriticalSection(&signal_info_lock);
654
655 /*
656 * If in parallel mode, stop worker threads and send QueryCancel to
657 * their connected backends. The main point of stopping the worker
658 * threads is to keep them from reporting the query cancels as errors,
659 * which would clutter the user's screen. We needn't stop the master
660 * thread since it won't be doing much anyway. Do this before
661 * canceling the main transaction, else we might get invalid-snapshot
662 * errors reported before we can stop the workers. Ignore errors,
663 * there's not much we can do about them anyway.
664 */
665 if (signal_info.pstate != NULL)
666 {
667 for (i = 0; i < signal_info.pstate->numWorkers; i++)
668 {
669 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
670 ArchiveHandle *AH = slot->AH;
671 HANDLE hThread = (HANDLE) slot->hThread;
672
673 /*
674 * Using TerminateThread here may leave some resources leaked,
675 * but it doesn't matter since we're about to end the whole
676 * process.
677 */
678 if (hThread != INVALID_HANDLE_VALUE)
679 TerminateThread(hThread, 0);
680
681 if (AH != NULL && AH->connCancel != NULL)
682 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
683 }
684 }
685
686 /*
687 * Send QueryCancel to master connection, if enabled. Ignore errors,
688 * there's not much we can do about them anyway.
689 */
690 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
691 (void) PQcancel(signal_info.myAH->connCancel,
692 errbuf, sizeof(errbuf));
693
694 LeaveCriticalSection(&signal_info_lock);
695
696 /*
697 * Report we're quitting, using nothing more complicated than
698 * write(2). (We might be able to get away with using pg_log_*()
699 * here, but since we terminated other threads uncleanly above, it
700 * seems better to assume as little as possible.)
701 */
702 if (progname)
703 {
704 write_stderr(progname);
705 write_stderr(": ");
706 }
707 write_stderr("terminated by user\n");
708 }
709
710 /* Always return FALSE to allow signal handling to continue */
711 return FALSE;
712}
713
714/*
715 * Enable cancel interrupt handler, if not already done.
716 */
717static void
718setup_cancel_handler(void)
719{
720 if (!signal_info.handler_set)
721 {
722 signal_info.handler_set = true;
723
724 InitializeCriticalSection(&signal_info_lock);
725
726 SetConsoleCtrlHandler(consoleHandler, TRUE);
727 }
728}
729
730#endif /* WIN32 */
731
732
733/*
734 * set_archive_cancel_info
735 *
736 * Fill AH->connCancel with cancellation info for the specified database
737 * connection; or clear it if conn is NULL.
738 */
739void
740set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
741{
742 PGcancel *oldConnCancel;
743
744 /*
745 * Activate the interrupt handler if we didn't yet in this process. On
746 * Windows, this also initializes signal_info_lock; therefore it's
747 * important that this happen at least once before we fork off any
748 * threads.
749 */
750 setup_cancel_handler();
751
752 /*
753 * On Unix, we assume that storing a pointer value is atomic with respect
754 * to any possible signal interrupt. On Windows, use a critical section.
755 */
756
757#ifdef WIN32
758 EnterCriticalSection(&signal_info_lock);
759#endif
760
761 /* Free the old one if we have one */
762 oldConnCancel = AH->connCancel;
763 /* be sure interrupt handler doesn't use pointer while freeing */
764 AH->connCancel = NULL;
765
766 if (oldConnCancel != NULL)
767 PQfreeCancel(oldConnCancel);
768
769 /* Set the new one if specified */
770 if (conn)
771 AH->connCancel = PQgetCancel(conn);
772
773 /*
774 * On Unix, there's only ever one active ArchiveHandle per process, so we
775 * can just set signal_info.myAH unconditionally. On Windows, do that
776 * only in the main thread; worker threads have to make sure their
777 * ArchiveHandle appears in the pstate data, which is dealt with in
778 * RunWorker().
779 */
780#ifndef WIN32
781 signal_info.myAH = AH;
782#else
783 if (mainThreadId == GetCurrentThreadId())
784 signal_info.myAH = AH;
785#endif
786
787#ifdef WIN32
788 LeaveCriticalSection(&signal_info_lock);
789#endif
790}
791
792/*
793 * set_cancel_pstate
794 *
795 * Set signal_info.pstate to point to the specified ParallelState, if any.
796 * We need this mainly to have an interlock against Windows signal thread.
797 */
798static void
799set_cancel_pstate(ParallelState *pstate)
800{
801#ifdef WIN32
802 EnterCriticalSection(&signal_info_lock);
803#endif
804
805 signal_info.pstate = pstate;
806
807#ifdef WIN32
808 LeaveCriticalSection(&signal_info_lock);
809#endif
810}
811
812/*
813 * set_cancel_slot_archive
814 *
815 * Set ParallelSlot's AH field to point to the specified archive, if any.
816 * We need this mainly to have an interlock against Windows signal thread.
817 */
818static void
819set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
820{
821#ifdef WIN32
822 EnterCriticalSection(&signal_info_lock);
823#endif
824
825 slot->AH = AH;
826
827#ifdef WIN32
828 LeaveCriticalSection(&signal_info_lock);
829#endif
830}
831
832
833/*
834 * This function is called by both Unix and Windows variants to set up
835 * and run a worker process. Caller should exit the process (or thread)
836 * upon return.
837 */
838static void
839RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
840{
841 int pipefd[2];
842
843 /* fetch child ends of pipes */
844 pipefd[PIPE_READ] = slot->pipeRevRead;
845 pipefd[PIPE_WRITE] = slot->pipeRevWrite;
846
847 /*
848 * Clone the archive so that we have our own state to work with, and in
849 * particular our own database connection.
850 *
851 * We clone on Unix as well as Windows, even though technically we don't
852 * need to because fork() gives us a copy in our own address space
853 * already. But CloneArchive resets the state information and also clones
854 * the database connection which both seem kinda helpful.
855 */
856 AH = CloneArchive(AH);
857
858 /* Remember cloned archive where signal handler can find it */
859 set_cancel_slot_archive(slot, AH);
860
861 /*
862 * Call the setup worker function that's defined in the ArchiveHandle.
863 */
864 (AH->SetupWorkerPtr) ((Archive *) AH);
865
866 /*
867 * Execute commands until done.
868 */
869 WaitForCommands(AH, pipefd);
870
871 /*
872 * Disconnect from database and clean up.
873 */
874 set_cancel_slot_archive(slot, NULL);
875 DisconnectDatabase(&(AH->public));
876 DeCloneArchive(AH);
877}
878
879/*
880 * Thread base function for Windows
881 */
882#ifdef WIN32
883static unsigned __stdcall
884init_spawned_worker_win32(WorkerInfo *wi)
885{
886 ArchiveHandle *AH = wi->AH;
887 ParallelSlot *slot = wi->slot;
888
889 /* Don't need WorkerInfo anymore */
890 free(wi);
891
892 /* Run the worker ... */
893 RunWorker(AH, slot);
894
895 /* Exit the thread */
896 _endthreadex(0);
897 return 0;
898}
899#endif /* WIN32 */
900
901/*
902 * This function starts a parallel dump or restore by spawning off the worker
903 * processes. For Windows, it creates a number of threads; on Unix the
904 * workers are created with fork().
905 */
906ParallelState *
907ParallelBackupStart(ArchiveHandle *AH)
908{
909 ParallelState *pstate;
910 int i;
911
912 Assert(AH->public.numWorkers > 0);
913
914 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
915
916 pstate->numWorkers = AH->public.numWorkers;
917 pstate->te = NULL;
918 pstate->parallelSlot = NULL;
919
920 if (AH->public.numWorkers == 1)
921 return pstate;
922
923 pstate->te = (TocEntry **)
924 pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
925 pstate->parallelSlot = (ParallelSlot *)
926 pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
927
928#ifdef WIN32
929 /* Make fmtId() and fmtQualifiedId() use thread-local storage */
930 getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
931#endif
932
933 /*
934 * Set the pstate in shutdown_info, to tell the exit handler that it must
935 * clean up workers as well as the main database connection. But we don't
936 * set this in signal_info yet, because we don't want child processes to
937 * inherit non-NULL signal_info.pstate.
938 */
939 shutdown_info.pstate = pstate;
940
941 /*
942 * Temporarily disable query cancellation on the master connection. This
943 * ensures that child processes won't inherit valid AH->connCancel
944 * settings and thus won't try to issue cancels against the master's
945 * connection. No harm is done if we fail while it's disabled, because
946 * the master connection is idle at this point anyway.
947 */
948 set_archive_cancel_info(AH, NULL);
949
950 /* Ensure stdio state is quiesced before forking */
951 fflush(NULL);
952
953 /* Create desired number of workers */
954 for (i = 0; i < pstate->numWorkers; i++)
955 {
956#ifdef WIN32
957 WorkerInfo *wi;
958 uintptr_t handle;
959#else
960 pid_t pid;
961#endif
962 ParallelSlot *slot = &(pstate->parallelSlot[i]);
963 int pipeMW[2],
964 pipeWM[2];
965
966 /* Create communication pipes for this worker */
967 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
968 fatal("could not create communication channels: %m");
969
970 pstate->te[i] = NULL; /* just for safety */
971
972 slot->workerStatus = WRKR_IDLE;
973 slot->AH = NULL;
974 slot->callback = NULL;
975 slot->callback_data = NULL;
976
977 /* master's ends of the pipes */
978 slot->pipeRead = pipeWM[PIPE_READ];
979 slot->pipeWrite = pipeMW[PIPE_WRITE];
980 /* child's ends of the pipes */
981 slot->pipeRevRead = pipeMW[PIPE_READ];
982 slot->pipeRevWrite = pipeWM[PIPE_WRITE];
983
984#ifdef WIN32
985 /* Create transient structure to pass args to worker function */
986 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
987
988 wi->AH = AH;
989 wi->slot = slot;
990
991 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
992 wi, 0, &(slot->threadId));
993 slot->hThread = handle;
994#else /* !WIN32 */
995 pid = fork();
996 if (pid == 0)
997 {
998 /* we are the worker */
999 int j;
1000
1001 /* this is needed for GetMyPSlot() */
1002 slot->pid = getpid();
1003
1004 /* instruct signal handler that we're in a worker now */
1005 signal_info.am_worker = true;
1006
1007 /* close read end of Worker -> Master */
1008 closesocket(pipeWM[PIPE_READ]);
1009 /* close write end of Master -> Worker */
1010 closesocket(pipeMW[PIPE_WRITE]);
1011
1012 /*
1013 * Close all inherited fds for communication of the master with
1014 * previously-forked workers.
1015 */
1016 for (j = 0; j < i; j++)
1017 {
1018 closesocket(pstate->parallelSlot[j].pipeRead);
1019 closesocket(pstate->parallelSlot[j].pipeWrite);
1020 }
1021
1022 /* Run the worker ... */
1023 RunWorker(AH, slot);
1024
1025 /* We can just exit(0) when done */
1026 exit(0);
1027 }
1028 else if (pid < 0)
1029 {
1030 /* fork failed */
1031 fatal("could not create worker process: %m");
1032 }
1033
1034 /* In Master after successful fork */
1035 slot->pid = pid;
1036
1037 /* close read end of Master -> Worker */
1038 closesocket(pipeMW[PIPE_READ]);
1039 /* close write end of Worker -> Master */
1040 closesocket(pipeWM[PIPE_WRITE]);
1041#endif /* WIN32 */
1042 }
1043
1044 /*
1045 * Having forked off the workers, disable SIGPIPE so that master isn't
1046 * killed if it tries to send a command to a dead worker. We don't want
1047 * the workers to inherit this setting, though.
1048 */
1049#ifndef WIN32
1050 pqsignal(SIGPIPE, SIG_IGN);
1051#endif
1052
1053 /*
1054 * Re-establish query cancellation on the master connection.
1055 */
1056 set_archive_cancel_info(AH, AH->connection);
1057
1058 /*
1059 * Tell the cancel signal handler to forward signals to worker processes,
1060 * too. (As with query cancel, we did not need this earlier because the
1061 * workers have not yet been given anything to do; if we die before this
1062 * point, any already-started workers will see EOF and quit promptly.)
1063 */
1064 set_cancel_pstate(pstate);
1065
1066 return pstate;
1067}
1068
1069/*
1070 * Close down a parallel dump or restore.
1071 */
1072void
1073ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1074{
1075 int i;
1076
1077 /* No work if non-parallel */
1078 if (pstate->numWorkers == 1)
1079 return;
1080
1081 /* There should not be any unfinished jobs */
1082 Assert(IsEveryWorkerIdle(pstate));
1083
1084 /* Close the sockets so that the workers know they can exit */
1085 for (i = 0; i < pstate->numWorkers; i++)
1086 {
1087 closesocket(pstate->parallelSlot[i].pipeRead);
1088 closesocket(pstate->parallelSlot[i].pipeWrite);
1089 }
1090
1091 /* Wait for them to exit */
1092 WaitForTerminatingWorkers(pstate);
1093
1094 /*
1095 * Unlink pstate from shutdown_info, so the exit handler will not try to
1096 * use it; and likewise unlink from signal_info.
1097 */
1098 shutdown_info.pstate = NULL;
1099 set_cancel_pstate(NULL);
1100
1101 /* Release state (mere neatnik-ism, since we're about to terminate) */
1102 free(pstate->te);
1103 free(pstate->parallelSlot);
1104 free(pstate);
1105}
1106
1107/*
1108 * These next four functions handle construction and parsing of the command
1109 * strings and response strings for parallel workers.
1110 *
1111 * Currently, these can be the same regardless of which archive format we are
1112 * processing. In future, we might want to let format modules override these
1113 * functions to add format-specific data to a command or response.
1114 */
1115
1116/*
1117 * buildWorkerCommand: format a command string to send to a worker.
1118 *
1119 * The string is built in the caller-supplied buffer of size buflen.
1120 */
1121static void
1122buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1123 char *buf, int buflen)
1124{
1125 if (act == ACT_DUMP)
1126 snprintf(buf, buflen, "DUMP %d", te->dumpId);
1127 else if (act == ACT_RESTORE)
1128 snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1129 else
1130 Assert(false);
1131}
1132
1133/*
1134 * parseWorkerCommand: interpret a command string in a worker.
1135 */
1136static void
1137parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1138 const char *msg)
1139{
1140 DumpId dumpId;
1141 int nBytes;
1142
1143 if (messageStartsWith(msg, "DUMP "))
1144 {
1145 *act = ACT_DUMP;
1146 sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1147 Assert(nBytes == strlen(msg));
1148 *te = getTocEntryByDumpId(AH, dumpId);
1149 Assert(*te != NULL);
1150 }
1151 else if (messageStartsWith(msg, "RESTORE "))
1152 {
1153 *act = ACT_RESTORE;
1154 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1155 Assert(nBytes == strlen(msg));
1156 *te = getTocEntryByDumpId(AH, dumpId);
1157 Assert(*te != NULL);
1158 }
1159 else
1160 fatal("unrecognized command received from master: \"%s\"",
1161 msg);
1162}
1163
1164/*
1165 * buildWorkerResponse: format a response string to send to the master.
1166 *
1167 * The string is built in the caller-supplied buffer of size buflen.
1168 */
1169static void
1170buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1171 char *buf, int buflen)
1172{
1173 snprintf(buf, buflen, "OK %d %d %d",
1174 te->dumpId,
1175 status,
1176 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1177}
1178
1179/*
1180 * parseWorkerResponse: parse the status message returned by a worker.
1181 *
1182 * Returns the integer status code, and may update fields of AH and/or te.
1183 */
1184static int
1185parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1186 const char *msg)
1187{
1188 DumpId dumpId;
1189 int nBytes,
1190 n_errors;
1191 int status = 0;
1192
1193 if (messageStartsWith(msg, "OK "))
1194 {
1195 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1196
1197 Assert(dumpId == te->dumpId);
1198 Assert(nBytes == strlen(msg));
1199
1200 AH->public.n_errors += n_errors;
1201 }
1202 else
1203 fatal("invalid message received from worker: \"%s\"",
1204 msg);
1205
1206 return status;
1207}
1208
1209/*
1210 * Dispatch a job to some free worker.
1211 *
1212 * te is the TocEntry to be processed, act is the action to be taken on it.
1213 * callback is the function to call on completion of the job.
1214 *
1215 * If no worker is currently available, this will block, and previously
1216 * registered callback functions may be called.
1217 */
1218void
1219DispatchJobForTocEntry(ArchiveHandle *AH,
1220 ParallelState *pstate,
1221 TocEntry *te,
1222 T_Action act,
1223 ParallelCompletionPtr callback,
1224 void *callback_data)
1225{
1226 int worker;
1227 char buf[256];
1228
1229 /* Get a worker, waiting if none are idle */
1230 while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1231 WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1232
1233 /* Construct and send command string */
1234 buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1235
1236 sendMessageToWorker(pstate, worker, buf);
1237
1238 /* Remember worker is busy, and which TocEntry it's working on */
1239 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1240 pstate->parallelSlot[worker].callback = callback;
1241 pstate->parallelSlot[worker].callback_data = callback_data;
1242 pstate->te[worker] = te;
1243}
1244
1245/*
1246 * Find an idle worker and return its slot number.
1247 * Return NO_SLOT if none are idle.
1248 */
1249static int
1250GetIdleWorker(ParallelState *pstate)
1251{
1252 int i;
1253
1254 for (i = 0; i < pstate->numWorkers; i++)
1255 {
1256 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1257 return i;
1258 }
1259 return NO_SLOT;
1260}
1261
1262/*
1263 * Return true iff every worker is in the WRKR_TERMINATED state.
1264 */
1265static bool
1266HasEveryWorkerTerminated(ParallelState *pstate)
1267{
1268 int i;
1269
1270 for (i = 0; i < pstate->numWorkers; i++)
1271 {
1272 if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
1273 return false;
1274 }
1275 return true;
1276}
1277
1278/*
1279 * Return true iff every worker is in the WRKR_IDLE state.
1280 */
1281bool
1282IsEveryWorkerIdle(ParallelState *pstate)
1283{
1284 int i;
1285
1286 for (i = 0; i < pstate->numWorkers; i++)
1287 {
1288 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1289 return false;
1290 }
1291 return true;
1292}
1293
1294/*
1295 * Acquire lock on a table to be dumped by a worker process.
1296 *
1297 * The master process is already holding an ACCESS SHARE lock. Ordinarily
1298 * it's no problem for a worker to get one too, but if anything else besides
1299 * pg_dump is running, there's a possible deadlock:
1300 *
1301 * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
1302 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1303 * because the master holds a conflicting ACCESS SHARE lock).
1304 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1305 * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1306 * 4) Now we have a deadlock, since the master is effectively waiting for
1307 * the worker. The server cannot detect that, however.
1308 *
1309 * To prevent an infinite wait, prior to touching a table in a worker, request
1310 * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1311 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1312 * so we have a deadlock. We must fail the backup in that case.
1313 */
1314static void
1315lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1316{
1317 const char *qualId;
1318 PQExpBuffer query;
1319 PGresult *res;
1320
1321 /* Nothing to do for BLOBS */
1322 if (strcmp(te->desc, "BLOBS") == 0)
1323 return;
1324
1325 query = createPQExpBuffer();
1326
1327 qualId = fmtQualifiedId(te->namespace, te->tag);
1328
1329 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1330 qualId);
1331
1332 res = PQexec(AH->connection, query->data);
1333
1334 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1335 fatal("could not obtain lock on relation \"%s\"\n"
1336 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1337 "on the table after the pg_dump parent process had gotten the "
1338 "initial ACCESS SHARE lock on the table.", qualId);
1339
1340 PQclear(res);
1341 destroyPQExpBuffer(query);
1342}
1343
1344/*
1345 * WaitForCommands: main routine for a worker process.
1346 *
1347 * Read and execute commands from the master until we see EOF on the pipe.
1348 */
1349static void
1350WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1351{
1352 char *command;
1353 TocEntry *te;
1354 T_Action act;
1355 int status = 0;
1356 char buf[256];
1357
1358 for (;;)
1359 {
1360 if (!(command = getMessageFromMaster(pipefd)))
1361 {
1362 /* EOF, so done */
1363 return;
1364 }
1365
1366 /* Decode the command */
1367 parseWorkerCommand(AH, &te, &act, command);
1368
1369 if (act == ACT_DUMP)
1370 {
1371 /* Acquire lock on this table within the worker's session */
1372 lockTableForWorker(AH, te);
1373
1374 /* Perform the dump command */
1375 status = (AH->WorkerJobDumpPtr) (AH, te);
1376 }
1377 else if (act == ACT_RESTORE)
1378 {
1379 /* Perform the restore command */
1380 status = (AH->WorkerJobRestorePtr) (AH, te);
1381 }
1382 else
1383 Assert(false);
1384
1385 /* Return status to master */
1386 buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1387
1388 sendMessageToMaster(pipefd, buf);
1389
1390 /* command was pg_malloc'd and we are responsible for free()ing it. */
1391 free(command);
1392 }
1393}
1394
1395/*
1396 * Check for status messages from workers.
1397 *
1398 * If do_wait is true, wait to get a status message; otherwise, just return
1399 * immediately if there is none available.
1400 *
1401 * When we get a status message, we pass the status code to the callback
1402 * function that was specified to DispatchJobForTocEntry, then reset the
1403 * worker status to IDLE.
1404 *
1405 * Returns true if we collected a status message, else false.
1406 *
1407 * XXX is it worth checking for more than one status message per call?
1408 * It seems somewhat unlikely that multiple workers would finish at exactly
1409 * the same time.
1410 */
1411static bool
1412ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1413{
1414 int worker;
1415 char *msg;
1416
1417 /* Try to collect a status message */
1418 msg = getMessageFromWorker(pstate, do_wait, &worker);
1419
1420 if (!msg)
1421 {
1422 /* If do_wait is true, we must have detected EOF on some socket */
1423 if (do_wait)
1424 fatal("a worker process died unexpectedly");
1425 return false;
1426 }
1427
1428 /* Process it and update our idea of the worker's status */
1429 if (messageStartsWith(msg, "OK "))
1430 {
1431 ParallelSlot *slot = &pstate->parallelSlot[worker];
1432 TocEntry *te = pstate->te[worker];
1433 int status;
1434
1435 status = parseWorkerResponse(AH, te, msg);
1436 slot->callback(AH, te, status, slot->callback_data);
1437 slot->workerStatus = WRKR_IDLE;
1438 pstate->te[worker] = NULL;
1439 }
1440 else
1441 fatal("invalid message received from worker: \"%s\"",
1442 msg);
1443
1444 /* Free the string returned from getMessageFromWorker */
1445 free(msg);
1446
1447 return true;
1448}
1449
1450/*
1451 * Check for status results from workers, waiting if necessary.
1452 *
1453 * Available wait modes are:
1454 * WFW_NO_WAIT: reap any available status, but don't block
1455 * WFW_GOT_STATUS: wait for at least one more worker to finish
1456 * WFW_ONE_IDLE: wait for at least one worker to be idle
1457 * WFW_ALL_IDLE: wait for all workers to be idle
1458 *
1459 * Any received results are passed to the callback specified to
1460 * DispatchJobForTocEntry.
1461 *
1462 * This function is executed in the master process.
1463 */
1464void
1465WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1466{
1467 bool do_wait = false;
1468
1469 /*
1470 * In GOT_STATUS mode, always block waiting for a message, since we can't
1471 * return till we get something. In other modes, we don't block the first
1472 * time through the loop.
1473 */
1474 if (mode == WFW_GOT_STATUS)
1475 {
1476 /* Assert that caller knows what it's doing */
1477 Assert(!IsEveryWorkerIdle(pstate));
1478 do_wait = true;
1479 }
1480
1481 for (;;)
1482 {
1483 /*
1484 * Check for status messages, even if we don't need to block. We do
1485 * not try very hard to reap all available messages, though, since
1486 * there's unlikely to be more than one.
1487 */
1488 if (ListenToWorkers(AH, pstate, do_wait))
1489 {
1490 /*
1491 * If we got a message, we are done by definition for GOT_STATUS
1492 * mode, and we can also be certain that there's at least one idle
1493 * worker. So we're done in all but ALL_IDLE mode.
1494 */
1495 if (mode != WFW_ALL_IDLE)
1496 return;
1497 }
1498
1499 /* Check whether we must wait for new status messages */
1500 switch (mode)
1501 {
1502 case WFW_NO_WAIT:
1503 return; /* never wait */
1504 case WFW_GOT_STATUS:
1505 Assert(false); /* can't get here, because we waited */
1506 break;
1507 case WFW_ONE_IDLE:
1508 if (GetIdleWorker(pstate) != NO_SLOT)
1509 return;
1510 break;
1511 case WFW_ALL_IDLE:
1512 if (IsEveryWorkerIdle(pstate))
1513 return;
1514 break;
1515 }
1516
1517 /* Loop back, and this time wait for something to happen */
1518 do_wait = true;
1519 }
1520}
1521
1522/*
1523 * Read one command message from the master, blocking if necessary
1524 * until one is available, and return it as a malloc'd string.
1525 * On EOF, return NULL.
1526 *
1527 * This function is executed in worker processes.
1528 */
1529static char *
1530getMessageFromMaster(int pipefd[2])
1531{
1532 return readMessageFromPipe(pipefd[PIPE_READ]);
1533}
1534
1535/*
1536 * Send a status message to the master.
1537 *
1538 * This function is executed in worker processes.
1539 */
1540static void
1541sendMessageToMaster(int pipefd[2], const char *str)
1542{
1543 int len = strlen(str) + 1;
1544
1545 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1546 fatal("could not write to the communication channel: %m");
1547}
1548
1549/*
1550 * Wait until some descriptor in "workerset" becomes readable.
1551 * Returns -1 on error, else the number of readable descriptors.
1552 */
1553static int
1554select_loop(int maxFd, fd_set *workerset)
1555{
1556 int i;
1557 fd_set saveSet = *workerset;
1558
1559 for (;;)
1560 {
1561 *workerset = saveSet;
1562 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1563
1564#ifndef WIN32
1565 if (i < 0 && errno == EINTR)
1566 continue;
1567#else
1568 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1569 continue;
1570#endif
1571 break;
1572 }
1573
1574 return i;
1575}
1576
1577
1578/*
1579 * Check for messages from worker processes.
1580 *
1581 * If a message is available, return it as a malloc'd string, and put the
1582 * index of the sending worker in *worker.
1583 *
1584 * If nothing is available, wait if "do_wait" is true, else return NULL.
1585 *
1586 * If we detect EOF on any socket, we'll return NULL. It's not great that
1587 * that's hard to distinguish from the no-data-available case, but for now
1588 * our one caller is okay with that.
1589 *
1590 * This function is executed in the master process.
1591 */
1592static char *
1593getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1594{
1595 int i;
1596 fd_set workerset;
1597 int maxFd = -1;
1598 struct timeval nowait = {0, 0};
1599
1600 /* construct bitmap of socket descriptors for select() */
1601 FD_ZERO(&workerset);
1602 for (i = 0; i < pstate->numWorkers; i++)
1603 {
1604 if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
1605 continue;
1606 FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1607 if (pstate->parallelSlot[i].pipeRead > maxFd)
1608 maxFd = pstate->parallelSlot[i].pipeRead;
1609 }
1610
1611 if (do_wait)
1612 {
1613 i = select_loop(maxFd, &workerset);
1614 Assert(i != 0);
1615 }
1616 else
1617 {
1618 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1619 return NULL;
1620 }
1621
1622 if (i < 0)
1623 fatal("select() failed: %m");
1624
1625 for (i = 0; i < pstate->numWorkers; i++)
1626 {
1627 char *msg;
1628
1629 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1630 continue;
1631
1632 /*
1633 * Read the message if any. If the socket is ready because of EOF,
1634 * we'll return NULL instead (and the socket will stay ready, so the
1635 * condition will persist).
1636 *
1637 * Note: because this is a blocking read, we'll wait if only part of
1638 * the message is available. Waiting a long time would be bad, but
1639 * since worker status messages are short and are always sent in one
1640 * operation, it shouldn't be a problem in practice.
1641 */
1642 msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1643 *worker = i;
1644 return msg;
1645 }
1646 Assert(false);
1647 return NULL;
1648}
1649
1650/*
1651 * Send a command message to the specified worker process.
1652 *
1653 * This function is executed in the master process.
1654 */
1655static void
1656sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1657{
1658 int len = strlen(str) + 1;
1659
1660 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1661 {
1662 fatal("could not write to the communication channel: %m");
1663 }
1664}
1665
1666/*
1667 * Read one message from the specified pipe (fd), blocking if necessary
1668 * until one is available, and return it as a malloc'd string.
1669 * On EOF, return NULL.
1670 *
1671 * A "message" on the channel is just a null-terminated string.
1672 */
1673static char *
1674readMessageFromPipe(int fd)
1675{
1676 char *msg;
1677 int msgsize,
1678 bufsize;
1679 int ret;
1680
1681 /*
1682 * In theory, if we let piperead() read multiple bytes, it might give us
1683 * back fragments of multiple messages. (That can't actually occur, since
1684 * neither master nor workers send more than one message without waiting
1685 * for a reply, but we don't wish to assume that here.) For simplicity,
1686 * read a byte at a time until we get the terminating '\0'. This method
1687 * is a bit inefficient, but since this is only used for relatively short
1688 * command and status strings, it shouldn't matter.
1689 */
1690 bufsize = 64; /* could be any number */
1691 msg = (char *) pg_malloc(bufsize);
1692 msgsize = 0;
1693 for (;;)
1694 {
1695 Assert(msgsize < bufsize);
1696 ret = piperead(fd, msg + msgsize, 1);
1697 if (ret <= 0)
1698 break; /* error or connection closure */
1699
1700 Assert(ret == 1);
1701
1702 if (msg[msgsize] == '\0')
1703 return msg; /* collected whole message */
1704
1705 msgsize++;
1706 if (msgsize == bufsize) /* enlarge buffer if needed */
1707 {
1708 bufsize += 16; /* could be any number */
1709 msg = (char *) pg_realloc(msg, bufsize);
1710 }
1711 }
1712
1713 /* Other end has closed the connection */
1714 pg_free(msg);
1715 return NULL;
1716}
1717
1718#ifdef WIN32
1719
1720/*
1721 * This is a replacement version of pipe(2) for Windows which allows the pipe
1722 * handles to be used in select().
1723 *
1724 * Reads and writes on the pipe must go through piperead()/pipewrite().
1725 *
1726 * For consistency with Unix we declare the returned handles as "int".
1727 * This is okay even on WIN64 because system handles are not more than
1728 * 32 bits wide, but we do have to do some casting.
1729 */
1730static int
1731pgpipe(int handles[2])
1732{
1733 pgsocket s,
1734 tmp_sock;
1735 struct sockaddr_in serv_addr;
1736 int len = sizeof(serv_addr);
1737
1738 /* We have to use the Unix socket invalid file descriptor value here. */
1739 handles[0] = handles[1] = -1;
1740
1741 /*
1742 * setup listen socket
1743 */
1744 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1745 {
1746 pg_log_error("pgpipe: could not create socket: error code %d",
1747 WSAGetLastError());
1748 return -1;
1749 }
1750
1751 memset((void *) &serv_addr, 0, sizeof(serv_addr));
1752 serv_addr.sin_family = AF_INET;
1753 serv_addr.sin_port = pg_hton16(0);
1754 serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1755 if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1756 {
1757 pg_log_error("pgpipe: could not bind: error code %d",
1758 WSAGetLastError());
1759 closesocket(s);
1760 return -1;
1761 }
1762 if (listen(s, 1) == SOCKET_ERROR)
1763 {
1764 pg_log_error("pgpipe: could not listen: error code %d",
1765 WSAGetLastError());
1766 closesocket(s);
1767 return -1;
1768 }
1769 if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1770 {
1771 pg_log_error("pgpipe: getsockname() failed: error code %d",
1772 WSAGetLastError());
1773 closesocket(s);
1774 return -1;
1775 }
1776
1777 /*
1778 * setup pipe handles
1779 */
1780 if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1781 {
1782 pg_log_error("pgpipe: could not create second socket: error code %d",
1783 WSAGetLastError());
1784 closesocket(s);
1785 return -1;
1786 }
1787 handles[1] = (int) tmp_sock;
1788
1789 if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1790 {
1791 pg_log_error("pgpipe: could not connect socket: error code %d",
1792 WSAGetLastError());
1793 closesocket(handles[1]);
1794 handles[1] = -1;
1795 closesocket(s);
1796 return -1;
1797 }
1798 if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1799 {
1800 pg_log_error("pgpipe: could not accept connection: error code %d",
1801 WSAGetLastError());
1802 closesocket(handles[1]);
1803 handles[1] = -1;
1804 closesocket(s);
1805 return -1;
1806 }
1807 handles[0] = (int) tmp_sock;
1808
1809 closesocket(s);
1810 return 0;
1811}
1812
1813/*
1814 * Windows implementation of reading from a pipe.
1815 */
1816static int
1817piperead(int s, char *buf, int len)
1818{
1819 int ret = recv(s, buf, len, 0);
1820
1821 if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
1822 {
1823 /* EOF on the pipe! */
1824 ret = 0;
1825 }
1826 return ret;
1827}
1828
1829#endif /* WIN32 */
1830