1/*-------------------------------------------------------------------------
2 *
3 * pg_backup_archiver.c
4 *
5 * Private implementation of the archiver routines.
6 *
7 * See the headers to pg_restore for more details.
8 *
9 * Copyright (c) 2000, Philip Warner
10 * Rights are granted to use this software in any way so long
11 * as this notice is not removed.
12 *
13 * The author is not responsible for loss or damages that may
14 * result from its use.
15 *
16 *
17 * IDENTIFICATION
18 * src/bin/pg_dump/pg_backup_archiver.c
19 *
20 *-------------------------------------------------------------------------
21 */
22#include "postgres_fe.h"
23
24#include <ctype.h>
25#include <fcntl.h>
26#include <unistd.h>
27#include <sys/stat.h>
28#include <sys/wait.h>
29#ifdef WIN32
30#include <io.h>
31#endif
32
33#include "parallel.h"
34#include "pg_backup_archiver.h"
35#include "pg_backup_db.h"
36#include "pg_backup_utils.h"
37#include "dumputils.h"
38#include "fe_utils/string_utils.h"
39
40#include "libpq/libpq-fs.h"
41
42#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
43#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
44
45/* state needed to save/restore an archive's output target */
46typedef struct _outputContext
47{
48 void *OF;
49 int gzOut;
50} OutputContext;
51
52/*
53 * State for tracking TocEntrys that are ready to process during a parallel
54 * restore. (This used to be a list, and we still call it that, though now
55 * it's really an array so that we can apply qsort to it.)
56 *
57 * tes[] is sized large enough that we can't overrun it.
58 * The valid entries are indexed first_te .. last_te inclusive.
59 * We periodically sort the array to bring larger-by-dataLength entries to
60 * the front; "sorted" is true if the valid entries are known sorted.
61 */
62typedef struct _parallelReadyList
63{
64 TocEntry **tes; /* Ready-to-dump TocEntrys */
65 int first_te; /* index of first valid entry in tes[] */
66 int last_te; /* index of last valid entry in tes[] */
67 bool sorted; /* are valid entries currently sorted? */
68} ParallelReadyList;
69
70
71static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
72 const int compression, bool dosync, ArchiveMode mode,
73 SetupWorkerPtrType setupWorkerPtr);
74static void _getObjectDescription(PQExpBuffer buf, TocEntry *te,
75 ArchiveHandle *AH);
76static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
77static char *sanitize_line(const char *str, bool want_hyphen);
78static void _doSetFixedOutputState(ArchiveHandle *AH);
79static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
80static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
81static void _becomeUser(ArchiveHandle *AH, const char *user);
82static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
83static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
84static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
85static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
86static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
87static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
88static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
89static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
90static RestorePass _tocEntryRestorePass(TocEntry *te);
91static bool _tocEntryIsACL(TocEntry *te);
92static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
93static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
94static void buildTocEntryArrays(ArchiveHandle *AH);
95static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
96static int _discoverArchiveFormat(ArchiveHandle *AH);
97
98static int RestoringToDB(ArchiveHandle *AH);
99static void dump_lo_buf(ArchiveHandle *AH);
100static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
101static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
102static OutputContext SaveOutput(ArchiveHandle *AH);
103static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
104
105static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
106static void restore_toc_entries_prefork(ArchiveHandle *AH,
107 TocEntry *pending_list);
108static void restore_toc_entries_parallel(ArchiveHandle *AH,
109 ParallelState *pstate,
110 TocEntry *pending_list);
111static void restore_toc_entries_postfork(ArchiveHandle *AH,
112 TocEntry *pending_list);
113static void pending_list_header_init(TocEntry *l);
114static void pending_list_append(TocEntry *l, TocEntry *te);
115static void pending_list_remove(TocEntry *te);
116static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
117static void ready_list_free(ParallelReadyList *ready_list);
118static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
119static void ready_list_remove(ParallelReadyList *ready_list, int i);
120static void ready_list_sort(ParallelReadyList *ready_list);
121static int TocEntrySizeCompare(const void *p1, const void *p2);
122static void move_to_ready_list(TocEntry *pending_list,
123 ParallelReadyList *ready_list,
124 RestorePass pass);
125static TocEntry *pop_next_work_item(ArchiveHandle *AH,
126 ParallelReadyList *ready_list,
127 ParallelState *pstate);
128static void mark_dump_job_done(ArchiveHandle *AH,
129 TocEntry *te,
130 int status,
131 void *callback_data);
132static void mark_restore_job_done(ArchiveHandle *AH,
133 TocEntry *te,
134 int status,
135 void *callback_data);
136static void fix_dependencies(ArchiveHandle *AH);
137static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
138static void repoint_table_dependencies(ArchiveHandle *AH);
139static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
140static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
141 ParallelReadyList *ready_list);
142static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
143static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
144
145static void StrictNamesCheck(RestoreOptions *ropt);
146
147
148/*
149 * Allocate a new DumpOptions block containing all default values.
150 */
151DumpOptions *
152NewDumpOptions(void)
153{
154 DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
155
156 InitDumpOptions(opts);
157 return opts;
158}
159
160/*
161 * Initialize a DumpOptions struct to all default values
162 */
163void
164InitDumpOptions(DumpOptions *opts)
165{
166 memset(opts, 0, sizeof(DumpOptions));
167 /* set any fields that shouldn't default to zeroes */
168 opts->include_everything = true;
169 opts->dumpSections = DUMP_UNSECTIONED;
170}
171
172/*
173 * Create a freshly allocated DumpOptions with options equivalent to those
174 * found in the given RestoreOptions.
175 */
176DumpOptions *
177dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
178{
179 DumpOptions *dopt = NewDumpOptions();
180
181 /* this is the inverse of what's at the end of pg_dump.c's main() */
182 dopt->outputClean = ropt->dropSchema;
183 dopt->dataOnly = ropt->dataOnly;
184 dopt->schemaOnly = ropt->schemaOnly;
185 dopt->if_exists = ropt->if_exists;
186 dopt->column_inserts = ropt->column_inserts;
187 dopt->dumpSections = ropt->dumpSections;
188 dopt->aclsSkip = ropt->aclsSkip;
189 dopt->outputSuperuser = ropt->superuser;
190 dopt->outputCreateDB = ropt->createDB;
191 dopt->outputNoOwner = ropt->noOwner;
192 dopt->outputNoTablespaces = ropt->noTablespace;
193 dopt->disable_triggers = ropt->disable_triggers;
194 dopt->use_setsessauth = ropt->use_setsessauth;
195 dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
196 dopt->dump_inserts = ropt->dump_inserts;
197 dopt->no_comments = ropt->no_comments;
198 dopt->no_publications = ropt->no_publications;
199 dopt->no_security_labels = ropt->no_security_labels;
200 dopt->no_subscriptions = ropt->no_subscriptions;
201 dopt->lockWaitTimeout = ropt->lockWaitTimeout;
202 dopt->include_everything = ropt->include_everything;
203 dopt->enable_row_security = ropt->enable_row_security;
204 dopt->sequence_data = ropt->sequence_data;
205
206 return dopt;
207}
208
209
210/*
211 * Wrapper functions.
212 *
213 * The objective it to make writing new formats and dumpers as simple
214 * as possible, if necessary at the expense of extra function calls etc.
215 *
216 */
217
218/*
219 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
220 * so It's defined in pg_dump.c and passed into OpenArchive. The restore worker
221 * setup doesn't need to know anything much, so it's defined here.
222 */
223static void
224setupRestoreWorker(Archive *AHX)
225{
226 ArchiveHandle *AH = (ArchiveHandle *) AHX;
227
228 AH->ReopenPtr(AH);
229}
230
231
232/* Create a new archive */
233/* Public */
234Archive *
235CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
236 const int compression, bool dosync, ArchiveMode mode,
237 SetupWorkerPtrType setupDumpWorker)
238
239{
240 ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, dosync,
241 mode, setupDumpWorker);
242
243 return (Archive *) AH;
244}
245
246/* Open an existing archive */
247/* Public */
248Archive *
249OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
250{
251 ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker);
252
253 return (Archive *) AH;
254}
255
256/* Public */
257void
258CloseArchive(Archive *AHX)
259{
260 int res = 0;
261 ArchiveHandle *AH = (ArchiveHandle *) AHX;
262
263 AH->ClosePtr(AH);
264
265 /* Close the output */
266 if (AH->gzOut)
267 res = GZCLOSE(AH->OF);
268 else if (AH->OF != stdout)
269 res = fclose(AH->OF);
270
271 if (res != 0)
272 fatal("could not close output file: %m");
273}
274
275/* Public */
276void
277SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
278{
279 /* Caller can omit dump options, in which case we synthesize them */
280 if (dopt == NULL && ropt != NULL)
281 dopt = dumpOptionsFromRestoreOptions(ropt);
282
283 /* Save options for later access */
284 AH->dopt = dopt;
285 AH->ropt = ropt;
286}
287
288/* Public */
289void
290ProcessArchiveRestoreOptions(Archive *AHX)
291{
292 ArchiveHandle *AH = (ArchiveHandle *) AHX;
293 RestoreOptions *ropt = AH->public.ropt;
294 TocEntry *te;
295 teSection curSection;
296
297 /* Decide which TOC entries will be dumped/restored, and mark them */
298 curSection = SECTION_PRE_DATA;
299 for (te = AH->toc->next; te != AH->toc; te = te->next)
300 {
301 /*
302 * When writing an archive, we also take this opportunity to check
303 * that we have generated the entries in a sane order that respects
304 * the section divisions. When reading, don't complain, since buggy
305 * old versions of pg_dump might generate out-of-order archives.
306 */
307 if (AH->mode != archModeRead)
308 {
309 switch (te->section)
310 {
311 case SECTION_NONE:
312 /* ok to be anywhere */
313 break;
314 case SECTION_PRE_DATA:
315 if (curSection != SECTION_PRE_DATA)
316 pg_log_warning("archive items not in correct section order");
317 break;
318 case SECTION_DATA:
319 if (curSection == SECTION_POST_DATA)
320 pg_log_warning("archive items not in correct section order");
321 break;
322 case SECTION_POST_DATA:
323 /* ok no matter which section we were in */
324 break;
325 default:
326 fatal("unexpected section code %d",
327 (int) te->section);
328 break;
329 }
330 }
331
332 if (te->section != SECTION_NONE)
333 curSection = te->section;
334
335 te->reqs = _tocEntryRequired(te, curSection, AH);
336 }
337
338 /* Enforce strict names checking */
339 if (ropt->strict_names)
340 StrictNamesCheck(ropt);
341}
342
343/* Public */
344void
345RestoreArchive(Archive *AHX)
346{
347 ArchiveHandle *AH = (ArchiveHandle *) AHX;
348 RestoreOptions *ropt = AH->public.ropt;
349 bool parallel_mode;
350 TocEntry *te;
351 OutputContext sav;
352
353 AH->stage = STAGE_INITIALIZING;
354
355 /*
356 * If we're going to do parallel restore, there are some restrictions.
357 */
358 parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
359 if (parallel_mode)
360 {
361 /* We haven't got round to making this work for all archive formats */
362 if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
363 fatal("parallel restore is not supported with this archive file format");
364
365 /* Doesn't work if the archive represents dependencies as OIDs */
366 if (AH->version < K_VERS_1_8)
367 fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
368
369 /*
370 * It's also not gonna work if we can't reopen the input file, so
371 * let's try that immediately.
372 */
373 AH->ReopenPtr(AH);
374 }
375
376 /*
377 * Make sure we won't need (de)compression we haven't got
378 */
379#ifndef HAVE_LIBZ
380 if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
381 {
382 for (te = AH->toc->next; te != AH->toc; te = te->next)
383 {
384 if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
385 fatal("cannot restore from compressed archive (compression not supported in this installation)");
386 }
387 }
388#endif
389
390 /*
391 * Prepare index arrays, so we can assume we have them throughout restore.
392 * It's possible we already did this, though.
393 */
394 if (AH->tocsByDumpId == NULL)
395 buildTocEntryArrays(AH);
396
397 /*
398 * If we're using a DB connection, then connect it.
399 */
400 if (ropt->useDB)
401 {
402 pg_log_info("connecting to database for restore");
403 if (AH->version < K_VERS_1_3)
404 fatal("direct database connections are not supported in pre-1.3 archives");
405
406 /*
407 * We don't want to guess at whether the dump will successfully
408 * restore; allow the attempt regardless of the version of the restore
409 * target.
410 */
411 AHX->minRemoteVersion = 0;
412 AHX->maxRemoteVersion = 9999999;
413
414 ConnectDatabase(AHX, ropt->dbname,
415 ropt->pghost, ropt->pgport, ropt->username,
416 ropt->promptPassword);
417
418 /*
419 * If we're talking to the DB directly, don't send comments since they
420 * obscure SQL when displaying errors
421 */
422 AH->noTocComments = 1;
423 }
424
425 /*
426 * Work out if we have an implied data-only restore. This can happen if
427 * the dump was data only or if the user has used a toc list to exclude
428 * all of the schema data. All we do is look for schema entries - if none
429 * are found then we set the dataOnly flag.
430 *
431 * We could scan for wanted TABLE entries, but that is not the same as
432 * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
433 */
434 if (!ropt->dataOnly)
435 {
436 int impliedDataOnly = 1;
437
438 for (te = AH->toc->next; te != AH->toc; te = te->next)
439 {
440 if ((te->reqs & REQ_SCHEMA) != 0)
441 { /* It's schema, and it's wanted */
442 impliedDataOnly = 0;
443 break;
444 }
445 }
446 if (impliedDataOnly)
447 {
448 ropt->dataOnly = impliedDataOnly;
449 pg_log_info("implied data-only restore");
450 }
451 }
452
453 /*
454 * Setup the output file if necessary.
455 */
456 sav = SaveOutput(AH);
457 if (ropt->filename || ropt->compression)
458 SetOutput(AH, ropt->filename, ropt->compression);
459
460 ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
461
462 if (AH->archiveRemoteVersion)
463 ahprintf(AH, "-- Dumped from database version %s\n",
464 AH->archiveRemoteVersion);
465 if (AH->archiveDumpVersion)
466 ahprintf(AH, "-- Dumped by pg_dump version %s\n",
467 AH->archiveDumpVersion);
468
469 ahprintf(AH, "\n");
470
471 if (AH->public.verbose)
472 dumpTimestamp(AH, "Started on", AH->createDate);
473
474 if (ropt->single_txn)
475 {
476 if (AH->connection)
477 StartTransaction(AHX);
478 else
479 ahprintf(AH, "BEGIN;\n\n");
480 }
481
482 /*
483 * Establish important parameter values right away.
484 */
485 _doSetFixedOutputState(AH);
486
487 AH->stage = STAGE_PROCESSING;
488
489 /*
490 * Drop the items at the start, in reverse order
491 */
492 if (ropt->dropSchema)
493 {
494 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
495 {
496 AH->currentTE = te;
497
498 /*
499 * In createDB mode, issue a DROP *only* for the database as a
500 * whole. Issuing drops against anything else would be wrong,
501 * because at this point we're connected to the wrong database.
502 * (The DATABASE PROPERTIES entry, if any, should be treated like
503 * the DATABASE entry.)
504 */
505 if (ropt->createDB)
506 {
507 if (strcmp(te->desc, "DATABASE") != 0 &&
508 strcmp(te->desc, "DATABASE PROPERTIES") != 0)
509 continue;
510 }
511
512 /* Otherwise, drop anything that's selected and has a dropStmt */
513 if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
514 {
515 pg_log_info("dropping %s %s", te->desc, te->tag);
516 /* Select owner and schema as necessary */
517 _becomeOwner(AH, te);
518 _selectOutputSchema(AH, te->namespace);
519
520 /*
521 * Now emit the DROP command, if the object has one. Note we
522 * don't necessarily emit it verbatim; at this point we add an
523 * appropriate IF EXISTS clause, if the user requested it.
524 */
525 if (*te->dropStmt != '\0')
526 {
527 if (!ropt->if_exists)
528 {
529 /* No --if-exists? Then just use the original */
530 ahprintf(AH, "%s", te->dropStmt);
531 }
532 else
533 {
534 /*
535 * Inject an appropriate spelling of "if exists". For
536 * large objects, we have a separate routine that
537 * knows how to do it, without depending on
538 * te->dropStmt; use that. For other objects we need
539 * to parse the command.
540 */
541 if (strncmp(te->desc, "BLOB", 4) == 0)
542 {
543 DropBlobIfExists(AH, te->catalogId.oid);
544 }
545 else
546 {
547 char *dropStmt = pg_strdup(te->dropStmt);
548 char *dropStmtOrig = dropStmt;
549 PQExpBuffer ftStmt = createPQExpBuffer();
550
551 /*
552 * Need to inject IF EXISTS clause after ALTER
553 * TABLE part in ALTER TABLE .. DROP statement
554 */
555 if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
556 {
557 appendPQExpBuffer(ftStmt,
558 "ALTER TABLE IF EXISTS");
559 dropStmt = dropStmt + 11;
560 }
561
562 /*
563 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
564 * not support the IF EXISTS clause, and therefore
565 * we simply emit the original command for DEFAULT
566 * objects (modulo the adjustment made above).
567 *
568 * Likewise, don't mess with DATABASE PROPERTIES.
569 *
570 * If we used CREATE OR REPLACE VIEW as a means of
571 * quasi-dropping an ON SELECT rule, that should
572 * be emitted unchanged as well.
573 *
574 * For other object types, we need to extract the
575 * first part of the DROP which includes the
576 * object type. Most of the time this matches
577 * te->desc, so search for that; however for the
578 * different kinds of CONSTRAINTs, we know to
579 * search for hardcoded "DROP CONSTRAINT" instead.
580 */
581 if (strcmp(te->desc, "DEFAULT") == 0 ||
582 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
583 strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
584 appendPQExpBufferStr(ftStmt, dropStmt);
585 else
586 {
587 char buffer[40];
588 char *mark;
589
590 if (strcmp(te->desc, "CONSTRAINT") == 0 ||
591 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
592 strcmp(te->desc, "FK CONSTRAINT") == 0)
593 strcpy(buffer, "DROP CONSTRAINT");
594 else
595 snprintf(buffer, sizeof(buffer), "DROP %s",
596 te->desc);
597
598 mark = strstr(dropStmt, buffer);
599
600 if (mark)
601 {
602 *mark = '\0';
603 appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
604 dropStmt, buffer,
605 mark + strlen(buffer));
606 }
607 else
608 {
609 /* complain and emit unmodified command */
610 pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
611 dropStmtOrig);
612 appendPQExpBufferStr(ftStmt, dropStmt);
613 }
614 }
615
616 ahprintf(AH, "%s", ftStmt->data);
617
618 destroyPQExpBuffer(ftStmt);
619 pg_free(dropStmtOrig);
620 }
621 }
622 }
623 }
624 }
625
626 /*
627 * _selectOutputSchema may have set currSchema to reflect the effect
628 * of a "SET search_path" command it emitted. However, by now we may
629 * have dropped that schema; or it might not have existed in the first
630 * place. In either case the effective value of search_path will not
631 * be what we think. Forcibly reset currSchema so that we will
632 * re-establish the search_path setting when needed (after creating
633 * the schema).
634 *
635 * If we treated users as pg_dump'able objects then we'd need to reset
636 * currUser here too.
637 */
638 if (AH->currSchema)
639 free(AH->currSchema);
640 AH->currSchema = NULL;
641 }
642
643 if (parallel_mode)
644 {
645 /*
646 * In parallel mode, turn control over to the parallel-restore logic.
647 */
648 ParallelState *pstate;
649 TocEntry pending_list;
650
651 /* The archive format module may need some setup for this */
652 if (AH->PrepParallelRestorePtr)
653 AH->PrepParallelRestorePtr(AH);
654
655 pending_list_header_init(&pending_list);
656
657 /* This runs PRE_DATA items and then disconnects from the database */
658 restore_toc_entries_prefork(AH, &pending_list);
659 Assert(AH->connection == NULL);
660
661 /* ParallelBackupStart() will actually fork the processes */
662 pstate = ParallelBackupStart(AH);
663 restore_toc_entries_parallel(AH, pstate, &pending_list);
664 ParallelBackupEnd(AH, pstate);
665
666 /* reconnect the master and see if we missed something */
667 restore_toc_entries_postfork(AH, &pending_list);
668 Assert(AH->connection != NULL);
669 }
670 else
671 {
672 /*
673 * In serial mode, process everything in three phases: normal items,
674 * then ACLs, then matview refresh items. We might be able to skip
675 * one or both extra phases in some cases, eg data-only restores.
676 */
677 bool haveACL = false;
678 bool haveRefresh = false;
679
680 for (te = AH->toc->next; te != AH->toc; te = te->next)
681 {
682 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
683 continue; /* ignore if not to be dumped at all */
684
685 switch (_tocEntryRestorePass(te))
686 {
687 case RESTORE_PASS_MAIN:
688 (void) restore_toc_entry(AH, te, false);
689 break;
690 case RESTORE_PASS_ACL:
691 haveACL = true;
692 break;
693 case RESTORE_PASS_REFRESH:
694 haveRefresh = true;
695 break;
696 }
697 }
698
699 if (haveACL)
700 {
701 for (te = AH->toc->next; te != AH->toc; te = te->next)
702 {
703 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
704 _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
705 (void) restore_toc_entry(AH, te, false);
706 }
707 }
708
709 if (haveRefresh)
710 {
711 for (te = AH->toc->next; te != AH->toc; te = te->next)
712 {
713 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
714 _tocEntryRestorePass(te) == RESTORE_PASS_REFRESH)
715 (void) restore_toc_entry(AH, te, false);
716 }
717 }
718 }
719
720 if (ropt->single_txn)
721 {
722 if (AH->connection)
723 CommitTransaction(AHX);
724 else
725 ahprintf(AH, "COMMIT;\n\n");
726 }
727
728 if (AH->public.verbose)
729 dumpTimestamp(AH, "Completed on", time(NULL));
730
731 ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
732
733 /*
734 * Clean up & we're done.
735 */
736 AH->stage = STAGE_FINALIZING;
737
738 if (ropt->filename || ropt->compression)
739 RestoreOutput(AH, sav);
740
741 if (ropt->useDB)
742 DisconnectDatabase(&AH->public);
743}
744
745/*
746 * Restore a single TOC item. Used in both parallel and non-parallel restore;
747 * is_parallel is true if we are in a worker child process.
748 *
749 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
750 * the parallel parent has to make the corresponding status update.
751 */
752static int
753restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
754{
755 RestoreOptions *ropt = AH->public.ropt;
756 int status = WORKER_OK;
757 teReqs reqs;
758 bool defnDumped;
759
760 AH->currentTE = te;
761
762 /* Dump any relevant dump warnings to stderr */
763 if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
764 {
765 if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
766 pg_log_warning("warning from original dump file: %s", te->defn);
767 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
768 pg_log_warning("warning from original dump file: %s", te->copyStmt);
769 }
770
771 /* Work out what, if anything, we want from this entry */
772 reqs = te->reqs;
773
774 defnDumped = false;
775
776 /*
777 * If it has a schema component that we want, then process that
778 */
779 if ((reqs & REQ_SCHEMA) != 0)
780 {
781 /* Show namespace in log message if available */
782 if (te->namespace)
783 pg_log_info("creating %s \"%s.%s\"",
784 te->desc, te->namespace, te->tag);
785 else
786 pg_log_info("creating %s \"%s\"",
787 te->desc, te->tag);
788
789 _printTocEntry(AH, te, false);
790 defnDumped = true;
791
792 if (strcmp(te->desc, "TABLE") == 0)
793 {
794 if (AH->lastErrorTE == te)
795 {
796 /*
797 * We failed to create the table. If
798 * --no-data-for-failed-tables was given, mark the
799 * corresponding TABLE DATA to be ignored.
800 *
801 * In the parallel case this must be done in the parent, so we
802 * just set the return value.
803 */
804 if (ropt->noDataForFailedTables)
805 {
806 if (is_parallel)
807 status = WORKER_INHIBIT_DATA;
808 else
809 inhibit_data_for_failed_table(AH, te);
810 }
811 }
812 else
813 {
814 /*
815 * We created the table successfully. Mark the corresponding
816 * TABLE DATA for possible truncation.
817 *
818 * In the parallel case this must be done in the parent, so we
819 * just set the return value.
820 */
821 if (is_parallel)
822 status = WORKER_CREATE_DONE;
823 else
824 mark_create_done(AH, te);
825 }
826 }
827
828 /*
829 * If we created a DB, connect to it. Also, if we changed DB
830 * properties, reconnect to ensure that relevant GUC settings are
831 * applied to our session.
832 */
833 if (strcmp(te->desc, "DATABASE") == 0 ||
834 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
835 {
836 PQExpBufferData connstr;
837
838 initPQExpBuffer(&connstr);
839 appendPQExpBufferStr(&connstr, "dbname=");
840 appendConnStrVal(&connstr, te->tag);
841 /* Abandon struct, but keep its buffer until process exit. */
842
843 pg_log_info("connecting to new database \"%s\"", te->tag);
844 _reconnectToDB(AH, te->tag);
845 ropt->dbname = connstr.data;
846 }
847 }
848
849 /*
850 * If it has a data component that we want, then process that
851 */
852 if ((reqs & REQ_DATA) != 0)
853 {
854 /*
855 * hadDumper will be set if there is genuine data component for this
856 * node. Otherwise, we need to check the defn field for statements
857 * that need to be executed in data-only restores.
858 */
859 if (te->hadDumper)
860 {
861 /*
862 * If we can output the data, then restore it.
863 */
864 if (AH->PrintTocDataPtr != NULL)
865 {
866 _printTocEntry(AH, te, true);
867
868 if (strcmp(te->desc, "BLOBS") == 0 ||
869 strcmp(te->desc, "BLOB COMMENTS") == 0)
870 {
871 pg_log_info("processing %s", te->desc);
872
873 _selectOutputSchema(AH, "pg_catalog");
874
875 /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
876 if (strcmp(te->desc, "BLOB COMMENTS") == 0)
877 AH->outputKind = OUTPUT_OTHERDATA;
878
879 AH->PrintTocDataPtr(AH, te);
880
881 AH->outputKind = OUTPUT_SQLCMDS;
882 }
883 else
884 {
885 _disableTriggersIfNecessary(AH, te);
886
887 /* Select owner and schema as necessary */
888 _becomeOwner(AH, te);
889 _selectOutputSchema(AH, te->namespace);
890
891 pg_log_info("processing data for table \"%s.%s\"",
892 te->namespace, te->tag);
893
894 /*
895 * In parallel restore, if we created the table earlier in
896 * the run then we wrap the COPY in a transaction and
897 * precede it with a TRUNCATE. If archiving is not on
898 * this prevents WAL-logging the COPY. This obtains a
899 * speedup similar to that from using single_txn mode in
900 * non-parallel restores.
901 */
902 if (is_parallel && te->created)
903 {
904 /*
905 * Parallel restore is always talking directly to a
906 * server, so no need to see if we should issue BEGIN.
907 */
908 StartTransaction(&AH->public);
909
910 /*
911 * If the server version is >= 8.4, make sure we issue
912 * TRUNCATE with ONLY so that child tables are not
913 * wiped.
914 */
915 ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
916 (PQserverVersion(AH->connection) >= 80400 ?
917 "ONLY " : ""),
918 fmtQualifiedId(te->namespace, te->tag));
919 }
920
921 /*
922 * If we have a copy statement, use it.
923 */
924 if (te->copyStmt && strlen(te->copyStmt) > 0)
925 {
926 ahprintf(AH, "%s", te->copyStmt);
927 AH->outputKind = OUTPUT_COPYDATA;
928 }
929 else
930 AH->outputKind = OUTPUT_OTHERDATA;
931
932 AH->PrintTocDataPtr(AH, te);
933
934 /*
935 * Terminate COPY if needed.
936 */
937 if (AH->outputKind == OUTPUT_COPYDATA &&
938 RestoringToDB(AH))
939 EndDBCopyMode(&AH->public, te->tag);
940 AH->outputKind = OUTPUT_SQLCMDS;
941
942 /* close out the transaction started above */
943 if (is_parallel && te->created)
944 CommitTransaction(&AH->public);
945
946 _enableTriggersIfNecessary(AH, te);
947 }
948 }
949 }
950 else if (!defnDumped)
951 {
952 /* If we haven't already dumped the defn part, do so now */
953 pg_log_info("executing %s %s", te->desc, te->tag);
954 _printTocEntry(AH, te, false);
955 }
956 }
957
958 if (AH->public.n_errors > 0 && status == WORKER_OK)
959 status = WORKER_IGNORED_ERRORS;
960
961 return status;
962}
963
964/*
965 * Allocate a new RestoreOptions block.
966 * This is mainly so we can initialize it, but also for future expansion,
967 */
968RestoreOptions *
969NewRestoreOptions(void)
970{
971 RestoreOptions *opts;
972
973 opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
974
975 /* set any fields that shouldn't default to zeroes */
976 opts->format = archUnknown;
977 opts->promptPassword = TRI_DEFAULT;
978 opts->dumpSections = DUMP_UNSECTIONED;
979
980 return opts;
981}
982
983static void
984_disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
985{
986 RestoreOptions *ropt = AH->public.ropt;
987
988 /* This hack is only needed in a data-only restore */
989 if (!ropt->dataOnly || !ropt->disable_triggers)
990 return;
991
992 pg_log_info("disabling triggers for %s", te->tag);
993
994 /*
995 * Become superuser if possible, since they are the only ones who can
996 * disable constraint triggers. If -S was not given, assume the initial
997 * user identity is a superuser. (XXX would it be better to become the
998 * table owner?)
999 */
1000 _becomeUser(AH, ropt->superuser);
1001
1002 /*
1003 * Disable them.
1004 */
1005 ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1006 fmtQualifiedId(te->namespace, te->tag));
1007}
1008
1009static void
1010_enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1011{
1012 RestoreOptions *ropt = AH->public.ropt;
1013
1014 /* This hack is only needed in a data-only restore */
1015 if (!ropt->dataOnly || !ropt->disable_triggers)
1016 return;
1017
1018 pg_log_info("enabling triggers for %s", te->tag);
1019
1020 /*
1021 * Become superuser if possible, since they are the only ones who can
1022 * disable constraint triggers. If -S was not given, assume the initial
1023 * user identity is a superuser. (XXX would it be better to become the
1024 * table owner?)
1025 */
1026 _becomeUser(AH, ropt->superuser);
1027
1028 /*
1029 * Enable them.
1030 */
1031 ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1032 fmtQualifiedId(te->namespace, te->tag));
1033}
1034
1035/*
1036 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1037 */
1038
1039/* Public */
1040void
1041WriteData(Archive *AHX, const void *data, size_t dLen)
1042{
1043 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1044
1045 if (!AH->currToc)
1046 fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1047
1048 AH->WriteDataPtr(AH, data, dLen);
1049
1050 return;
1051}
1052
1053/*
1054 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1055 * repository for all metadata. But the name has stuck.
1056 *
1057 * The new entry is added to the Archive's TOC list. Most callers can ignore
1058 * the result value because nothing else need be done, but a few want to
1059 * manipulate the TOC entry further.
1060 */
1061
1062/* Public */
1063TocEntry *
1064ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1065 ArchiveOpts *opts)
1066{
1067 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1068 TocEntry *newToc;
1069
1070 newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1071
1072 AH->tocCount++;
1073 if (dumpId > AH->maxDumpId)
1074 AH->maxDumpId = dumpId;
1075
1076 newToc->prev = AH->toc->prev;
1077 newToc->next = AH->toc;
1078 AH->toc->prev->next = newToc;
1079 AH->toc->prev = newToc;
1080
1081 newToc->catalogId = catalogId;
1082 newToc->dumpId = dumpId;
1083 newToc->section = opts->section;
1084
1085 newToc->tag = pg_strdup(opts->tag);
1086 newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1087 newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1088 newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1089 newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1090 newToc->desc = pg_strdup(opts->description);
1091 newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1092 newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1093 newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1094
1095 if (opts->nDeps > 0)
1096 {
1097 newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1098 memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1099 newToc->nDeps = opts->nDeps;
1100 }
1101 else
1102 {
1103 newToc->dependencies = NULL;
1104 newToc->nDeps = 0;
1105 }
1106
1107 newToc->dataDumper = opts->dumpFn;
1108 newToc->dataDumperArg = opts->dumpArg;
1109 newToc->hadDumper = opts->dumpFn ? true : false;
1110
1111 newToc->formatData = NULL;
1112 newToc->dataLength = 0;
1113
1114 if (AH->ArchiveEntryPtr != NULL)
1115 AH->ArchiveEntryPtr(AH, newToc);
1116
1117 return newToc;
1118}
1119
1120/* Public */
1121void
1122PrintTOCSummary(Archive *AHX)
1123{
1124 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1125 RestoreOptions *ropt = AH->public.ropt;
1126 TocEntry *te;
1127 teSection curSection;
1128 OutputContext sav;
1129 const char *fmtName;
1130 char stamp_str[64];
1131
1132 sav = SaveOutput(AH);
1133 if (ropt->filename)
1134 SetOutput(AH, ropt->filename, 0 /* no compression */ );
1135
1136 if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1137 localtime(&AH->createDate)) == 0)
1138 strcpy(stamp_str, "[unknown]");
1139
1140 ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1141 ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n",
1142 sanitize_line(AH->archdbname, false),
1143 AH->tocCount, AH->compression);
1144
1145 switch (AH->format)
1146 {
1147 case archCustom:
1148 fmtName = "CUSTOM";
1149 break;
1150 case archDirectory:
1151 fmtName = "DIRECTORY";
1152 break;
1153 case archTar:
1154 fmtName = "TAR";
1155 break;
1156 default:
1157 fmtName = "UNKNOWN";
1158 }
1159
1160 ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1161 ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1162 ahprintf(AH, "; Format: %s\n", fmtName);
1163 ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1164 ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1165 if (AH->archiveRemoteVersion)
1166 ahprintf(AH, "; Dumped from database version: %s\n",
1167 AH->archiveRemoteVersion);
1168 if (AH->archiveDumpVersion)
1169 ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1170 AH->archiveDumpVersion);
1171
1172 ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1173
1174 curSection = SECTION_PRE_DATA;
1175 for (te = AH->toc->next; te != AH->toc; te = te->next)
1176 {
1177 if (te->section != SECTION_NONE)
1178 curSection = te->section;
1179 if (ropt->verbose ||
1180 (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1181 {
1182 char *sanitized_name;
1183 char *sanitized_schema;
1184 char *sanitized_owner;
1185
1186 /*
1187 */
1188 sanitized_name = sanitize_line(te->tag, false);
1189 sanitized_schema = sanitize_line(te->namespace, true);
1190 sanitized_owner = sanitize_line(te->owner, false);
1191
1192 ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1193 te->catalogId.tableoid, te->catalogId.oid,
1194 te->desc, sanitized_schema, sanitized_name,
1195 sanitized_owner);
1196
1197 free(sanitized_name);
1198 free(sanitized_schema);
1199 free(sanitized_owner);
1200 }
1201 if (ropt->verbose && te->nDeps > 0)
1202 {
1203 int i;
1204
1205 ahprintf(AH, ";\tdepends on:");
1206 for (i = 0; i < te->nDeps; i++)
1207 ahprintf(AH, " %d", te->dependencies[i]);
1208 ahprintf(AH, "\n");
1209 }
1210 }
1211
1212 /* Enforce strict names checking */
1213 if (ropt->strict_names)
1214 StrictNamesCheck(ropt);
1215
1216 if (ropt->filename)
1217 RestoreOutput(AH, sav);
1218}
1219
1220/***********
1221 * BLOB Archival
1222 ***********/
1223
1224/* Called by a dumper to signal start of a BLOB */
1225int
1226StartBlob(Archive *AHX, Oid oid)
1227{
1228 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1229
1230 if (!AH->StartBlobPtr)
1231 fatal("large-object output not supported in chosen format");
1232
1233 AH->StartBlobPtr(AH, AH->currToc, oid);
1234
1235 return 1;
1236}
1237
1238/* Called by a dumper to signal end of a BLOB */
1239int
1240EndBlob(Archive *AHX, Oid oid)
1241{
1242 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1243
1244 if (AH->EndBlobPtr)
1245 AH->EndBlobPtr(AH, AH->currToc, oid);
1246
1247 return 1;
1248}
1249
1250/**********
1251 * BLOB Restoration
1252 **********/
1253
1254/*
1255 * Called by a format handler before any blobs are restored
1256 */
1257void
1258StartRestoreBlobs(ArchiveHandle *AH)
1259{
1260 RestoreOptions *ropt = AH->public.ropt;
1261
1262 if (!ropt->single_txn)
1263 {
1264 if (AH->connection)
1265 StartTransaction(&AH->public);
1266 else
1267 ahprintf(AH, "BEGIN;\n\n");
1268 }
1269
1270 AH->blobCount = 0;
1271}
1272
1273/*
1274 * Called by a format handler after all blobs are restored
1275 */
1276void
1277EndRestoreBlobs(ArchiveHandle *AH)
1278{
1279 RestoreOptions *ropt = AH->public.ropt;
1280
1281 if (!ropt->single_txn)
1282 {
1283 if (AH->connection)
1284 CommitTransaction(&AH->public);
1285 else
1286 ahprintf(AH, "COMMIT;\n\n");
1287 }
1288
1289 pg_log_info(ngettext("restored %d large object",
1290 "restored %d large objects",
1291 AH->blobCount),
1292 AH->blobCount);
1293}
1294
1295
1296/*
1297 * Called by a format handler to initiate restoration of a blob
1298 */
1299void
1300StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1301{
1302 bool old_blob_style = (AH->version < K_VERS_1_12);
1303 Oid loOid;
1304
1305 AH->blobCount++;
1306
1307 /* Initialize the LO Buffer */
1308 AH->lo_buf_used = 0;
1309
1310 pg_log_info("restoring large object with OID %u", oid);
1311
1312 /* With an old archive we must do drop and create logic here */
1313 if (old_blob_style && drop)
1314 DropBlobIfExists(AH, oid);
1315
1316 if (AH->connection)
1317 {
1318 if (old_blob_style)
1319 {
1320 loOid = lo_create(AH->connection, oid);
1321 if (loOid == 0 || loOid != oid)
1322 fatal("could not create large object %u: %s",
1323 oid, PQerrorMessage(AH->connection));
1324 }
1325 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1326 if (AH->loFd == -1)
1327 fatal("could not open large object %u: %s",
1328 oid, PQerrorMessage(AH->connection));
1329 }
1330 else
1331 {
1332 if (old_blob_style)
1333 ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1334 oid, INV_WRITE);
1335 else
1336 ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1337 oid, INV_WRITE);
1338 }
1339
1340 AH->writingBlob = 1;
1341}
1342
1343void
1344EndRestoreBlob(ArchiveHandle *AH, Oid oid)
1345{
1346 if (AH->lo_buf_used > 0)
1347 {
1348 /* Write remaining bytes from the LO buffer */
1349 dump_lo_buf(AH);
1350 }
1351
1352 AH->writingBlob = 0;
1353
1354 if (AH->connection)
1355 {
1356 lo_close(AH->connection, AH->loFd);
1357 AH->loFd = -1;
1358 }
1359 else
1360 {
1361 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1362 }
1363}
1364
1365/***********
1366 * Sorting and Reordering
1367 ***********/
1368
1369void
1370SortTocFromFile(Archive *AHX)
1371{
1372 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1373 RestoreOptions *ropt = AH->public.ropt;
1374 FILE *fh;
1375 char buf[100];
1376 bool incomplete_line;
1377
1378 /* Allocate space for the 'wanted' array, and init it */
1379 ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1380
1381 /* Setup the file */
1382 fh = fopen(ropt->tocFile, PG_BINARY_R);
1383 if (!fh)
1384 fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1385
1386 incomplete_line = false;
1387 while (fgets(buf, sizeof(buf), fh) != NULL)
1388 {
1389 bool prev_incomplete_line = incomplete_line;
1390 int buflen;
1391 char *cmnt;
1392 char *endptr;
1393 DumpId id;
1394 TocEntry *te;
1395
1396 /*
1397 * Some lines in the file might be longer than sizeof(buf). This is
1398 * no problem, since we only care about the leading numeric ID which
1399 * can be at most a few characters; but we have to skip continuation
1400 * bufferloads when processing a long line.
1401 */
1402 buflen = strlen(buf);
1403 if (buflen > 0 && buf[buflen - 1] == '\n')
1404 incomplete_line = false;
1405 else
1406 incomplete_line = true;
1407 if (prev_incomplete_line)
1408 continue;
1409
1410 /* Truncate line at comment, if any */
1411 cmnt = strchr(buf, ';');
1412 if (cmnt != NULL)
1413 cmnt[0] = '\0';
1414
1415 /* Ignore if all blank */
1416 if (strspn(buf, " \t\r\n") == strlen(buf))
1417 continue;
1418
1419 /* Get an ID, check it's valid and not already seen */
1420 id = strtol(buf, &endptr, 10);
1421 if (endptr == buf || id <= 0 || id > AH->maxDumpId ||
1422 ropt->idWanted[id - 1])
1423 {
1424 pg_log_warning("line ignored: %s", buf);
1425 continue;
1426 }
1427
1428 /* Find TOC entry */
1429 te = getTocEntryByDumpId(AH, id);
1430 if (!te)
1431 fatal("could not find entry for ID %d",
1432 id);
1433
1434 /* Mark it wanted */
1435 ropt->idWanted[id - 1] = true;
1436
1437 /*
1438 * Move each item to the end of the list as it is selected, so that
1439 * they are placed in the desired order. Any unwanted items will end
1440 * up at the front of the list, which may seem unintuitive but it's
1441 * what we need. In an ordinary serial restore that makes no
1442 * difference, but in a parallel restore we need to mark unrestored
1443 * items' dependencies as satisfied before we start examining
1444 * restorable items. Otherwise they could have surprising
1445 * side-effects on the order in which restorable items actually get
1446 * restored.
1447 */
1448 _moveBefore(AH, AH->toc, te);
1449 }
1450
1451 if (fclose(fh) != 0)
1452 fatal("could not close TOC file: %m");
1453}
1454
1455/**********************
1456 * 'Convenience functions that look like standard IO functions
1457 * for writing data when in dump mode.
1458 **********************/
1459
1460/* Public */
1461void
1462archputs(const char *s, Archive *AH)
1463{
1464 WriteData(AH, s, strlen(s));
1465 return;
1466}
1467
1468/* Public */
1469int
1470archprintf(Archive *AH, const char *fmt,...)
1471{
1472 int save_errno = errno;
1473 char *p;
1474 size_t len = 128; /* initial assumption about buffer size */
1475 size_t cnt;
1476
1477 for (;;)
1478 {
1479 va_list args;
1480
1481 /* Allocate work buffer. */
1482 p = (char *) pg_malloc(len);
1483
1484 /* Try to format the data. */
1485 errno = save_errno;
1486 va_start(args, fmt);
1487 cnt = pvsnprintf(p, len, fmt, args);
1488 va_end(args);
1489
1490 if (cnt < len)
1491 break; /* success */
1492
1493 /* Release buffer and loop around to try again with larger len. */
1494 free(p);
1495 len = cnt;
1496 }
1497
1498 WriteData(AH, p, cnt);
1499 free(p);
1500 return (int) cnt;
1501}
1502
1503
1504/*******************************
1505 * Stuff below here should be 'private' to the archiver routines
1506 *******************************/
1507
1508static void
1509SetOutput(ArchiveHandle *AH, const char *filename, int compression)
1510{
1511 int fn;
1512
1513 if (filename)
1514 {
1515 if (strcmp(filename, "-") == 0)
1516 fn = fileno(stdout);
1517 else
1518 fn = -1;
1519 }
1520 else if (AH->FH)
1521 fn = fileno(AH->FH);
1522 else if (AH->fSpec)
1523 {
1524 fn = -1;
1525 filename = AH->fSpec;
1526 }
1527 else
1528 fn = fileno(stdout);
1529
1530 /* If compression explicitly requested, use gzopen */
1531#ifdef HAVE_LIBZ
1532 if (compression != 0)
1533 {
1534 char fmode[14];
1535
1536 /* Don't use PG_BINARY_x since this is zlib */
1537 sprintf(fmode, "wb%d", compression);
1538 if (fn >= 0)
1539 AH->OF = gzdopen(dup(fn), fmode);
1540 else
1541 AH->OF = gzopen(filename, fmode);
1542 AH->gzOut = 1;
1543 }
1544 else
1545#endif
1546 { /* Use fopen */
1547 if (AH->mode == archModeAppend)
1548 {
1549 if (fn >= 0)
1550 AH->OF = fdopen(dup(fn), PG_BINARY_A);
1551 else
1552 AH->OF = fopen(filename, PG_BINARY_A);
1553 }
1554 else
1555 {
1556 if (fn >= 0)
1557 AH->OF = fdopen(dup(fn), PG_BINARY_W);
1558 else
1559 AH->OF = fopen(filename, PG_BINARY_W);
1560 }
1561 AH->gzOut = 0;
1562 }
1563
1564 if (!AH->OF)
1565 {
1566 if (filename)
1567 fatal("could not open output file \"%s\": %m", filename);
1568 else
1569 fatal("could not open output file: %m");
1570 }
1571}
1572
1573static OutputContext
1574SaveOutput(ArchiveHandle *AH)
1575{
1576 OutputContext sav;
1577
1578 sav.OF = AH->OF;
1579 sav.gzOut = AH->gzOut;
1580
1581 return sav;
1582}
1583
1584static void
1585RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
1586{
1587 int res;
1588
1589 if (AH->gzOut)
1590 res = GZCLOSE(AH->OF);
1591 else
1592 res = fclose(AH->OF);
1593
1594 if (res != 0)
1595 fatal("could not close output file: %m");
1596
1597 AH->gzOut = savedContext.gzOut;
1598 AH->OF = savedContext.OF;
1599}
1600
1601
1602
1603/*
1604 * Print formatted text to the output file (usually stdout).
1605 */
1606int
1607ahprintf(ArchiveHandle *AH, const char *fmt,...)
1608{
1609 int save_errno = errno;
1610 char *p;
1611 size_t len = 128; /* initial assumption about buffer size */
1612 size_t cnt;
1613
1614 for (;;)
1615 {
1616 va_list args;
1617
1618 /* Allocate work buffer. */
1619 p = (char *) pg_malloc(len);
1620
1621 /* Try to format the data. */
1622 errno = save_errno;
1623 va_start(args, fmt);
1624 cnt = pvsnprintf(p, len, fmt, args);
1625 va_end(args);
1626
1627 if (cnt < len)
1628 break; /* success */
1629
1630 /* Release buffer and loop around to try again with larger len. */
1631 free(p);
1632 len = cnt;
1633 }
1634
1635 ahwrite(p, 1, cnt, AH);
1636 free(p);
1637 return (int) cnt;
1638}
1639
1640/*
1641 * Single place for logic which says 'We are restoring to a direct DB connection'.
1642 */
1643static int
1644RestoringToDB(ArchiveHandle *AH)
1645{
1646 RestoreOptions *ropt = AH->public.ropt;
1647
1648 return (ropt && ropt->useDB && AH->connection);
1649}
1650
1651/*
1652 * Dump the current contents of the LO data buffer while writing a BLOB
1653 */
1654static void
1655dump_lo_buf(ArchiveHandle *AH)
1656{
1657 if (AH->connection)
1658 {
1659 size_t res;
1660
1661 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1662 pg_log_debug(ngettext("wrote %lu byte of large object data (result = %lu)",
1663 "wrote %lu bytes of large object data (result = %lu)",
1664 AH->lo_buf_used),
1665 (unsigned long) AH->lo_buf_used, (unsigned long) res);
1666 if (res != AH->lo_buf_used)
1667 fatal("could not write to large object (result: %lu, expected: %lu)",
1668 (unsigned long) res, (unsigned long) AH->lo_buf_used);
1669 }
1670 else
1671 {
1672 PQExpBuffer buf = createPQExpBuffer();
1673
1674 appendByteaLiteralAHX(buf,
1675 (const unsigned char *) AH->lo_buf,
1676 AH->lo_buf_used,
1677 AH);
1678
1679 /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1680 AH->writingBlob = 0;
1681 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1682 AH->writingBlob = 1;
1683
1684 destroyPQExpBuffer(buf);
1685 }
1686 AH->lo_buf_used = 0;
1687}
1688
1689
1690/*
1691 * Write buffer to the output file (usually stdout). This is used for
1692 * outputting 'restore' scripts etc. It is even possible for an archive
1693 * format to create a custom output routine to 'fake' a restore if it
1694 * wants to generate a script (see TAR output).
1695 */
1696void
1697ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1698{
1699 int bytes_written = 0;
1700
1701 if (AH->writingBlob)
1702 {
1703 size_t remaining = size * nmemb;
1704
1705 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1706 {
1707 size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1708
1709 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1710 ptr = (const void *) ((const char *) ptr + avail);
1711 remaining -= avail;
1712 AH->lo_buf_used += avail;
1713 dump_lo_buf(AH);
1714 }
1715
1716 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1717 AH->lo_buf_used += remaining;
1718
1719 bytes_written = size * nmemb;
1720 }
1721 else if (AH->gzOut)
1722 bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
1723 else if (AH->CustomOutPtr)
1724 bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1725
1726 else
1727 {
1728 /*
1729 * If we're doing a restore, and it's direct to DB, and we're
1730 * connected then send it to the DB.
1731 */
1732 if (RestoringToDB(AH))
1733 bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1734 else
1735 bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1736 }
1737
1738 if (bytes_written != size * nmemb)
1739 WRITE_ERROR_EXIT;
1740
1741 return;
1742}
1743
1744/* on some error, we may decide to go on... */
1745void
1746warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1747{
1748 va_list ap;
1749
1750 switch (AH->stage)
1751 {
1752
1753 case STAGE_NONE:
1754 /* Do nothing special */
1755 break;
1756
1757 case STAGE_INITIALIZING:
1758 if (AH->stage != AH->lastErrorStage)
1759 pg_log_generic(PG_LOG_INFO, "while INITIALIZING:");
1760 break;
1761
1762 case STAGE_PROCESSING:
1763 if (AH->stage != AH->lastErrorStage)
1764 pg_log_generic(PG_LOG_INFO, "while PROCESSING TOC:");
1765 break;
1766
1767 case STAGE_FINALIZING:
1768 if (AH->stage != AH->lastErrorStage)
1769 pg_log_generic(PG_LOG_INFO, "while FINALIZING:");
1770 break;
1771 }
1772 if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1773 {
1774 pg_log_generic(PG_LOG_INFO, "from TOC entry %d; %u %u %s %s %s",
1775 AH->currentTE->dumpId,
1776 AH->currentTE->catalogId.tableoid,
1777 AH->currentTE->catalogId.oid,
1778 AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1779 AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1780 AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1781 }
1782 AH->lastErrorStage = AH->stage;
1783 AH->lastErrorTE = AH->currentTE;
1784
1785 va_start(ap, fmt);
1786 pg_log_generic_v(PG_LOG_ERROR, fmt, ap);
1787 va_end(ap);
1788
1789 if (AH->public.exit_on_error)
1790 exit_nicely(1);
1791 else
1792 AH->public.n_errors++;
1793}
1794
1795#ifdef NOT_USED
1796
1797static void
1798_moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1799{
1800 /* Unlink te from list */
1801 te->prev->next = te->next;
1802 te->next->prev = te->prev;
1803
1804 /* and insert it after "pos" */
1805 te->prev = pos;
1806 te->next = pos->next;
1807 pos->next->prev = te;
1808 pos->next = te;
1809}
1810#endif
1811
1812static void
1813_moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1814{
1815 /* Unlink te from list */
1816 te->prev->next = te->next;
1817 te->next->prev = te->prev;
1818
1819 /* and insert it before "pos" */
1820 te->prev = pos->prev;
1821 te->next = pos;
1822 pos->prev->next = te;
1823 pos->prev = te;
1824}
1825
1826/*
1827 * Build index arrays for the TOC list
1828 *
1829 * This should be invoked only after we have created or read in all the TOC
1830 * items.
1831 *
1832 * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1833 * array entries run only up to maxDumpId. We might see dependency dump IDs
1834 * beyond that (if the dump was partial); so always check the array bound
1835 * before trying to touch an array entry.
1836 */
1837static void
1838buildTocEntryArrays(ArchiveHandle *AH)
1839{
1840 DumpId maxDumpId = AH->maxDumpId;
1841 TocEntry *te;
1842
1843 AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1844 AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1845
1846 for (te = AH->toc->next; te != AH->toc; te = te->next)
1847 {
1848 /* this check is purely paranoia, maxDumpId should be correct */
1849 if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1850 fatal("bad dumpId");
1851
1852 /* tocsByDumpId indexes all TOCs by their dump ID */
1853 AH->tocsByDumpId[te->dumpId] = te;
1854
1855 /*
1856 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1857 * TOC entry that has a DATA item. We compute this by reversing the
1858 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1859 * just one dependency and it is the TABLE item.
1860 */
1861 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1862 {
1863 DumpId tableId = te->dependencies[0];
1864
1865 /*
1866 * The TABLE item might not have been in the archive, if this was
1867 * a data-only dump; but its dump ID should be less than its data
1868 * item's dump ID, so there should be a place for it in the array.
1869 */
1870 if (tableId <= 0 || tableId > maxDumpId)
1871 fatal("bad table dumpId for TABLE DATA item");
1872
1873 AH->tableDataId[tableId] = te->dumpId;
1874 }
1875 }
1876}
1877
1878TocEntry *
1879getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1880{
1881 /* build index arrays if we didn't already */
1882 if (AH->tocsByDumpId == NULL)
1883 buildTocEntryArrays(AH);
1884
1885 if (id > 0 && id <= AH->maxDumpId)
1886 return AH->tocsByDumpId[id];
1887
1888 return NULL;
1889}
1890
1891teReqs
1892TocIDRequired(ArchiveHandle *AH, DumpId id)
1893{
1894 TocEntry *te = getTocEntryByDumpId(AH, id);
1895
1896 if (!te)
1897 return 0;
1898
1899 return te->reqs;
1900}
1901
1902size_t
1903WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1904{
1905 int off;
1906
1907 /* Save the flag */
1908 AH->WriteBytePtr(AH, wasSet);
1909
1910 /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1911 for (off = 0; off < sizeof(pgoff_t); off++)
1912 {
1913 AH->WriteBytePtr(AH, o & 0xFF);
1914 o >>= 8;
1915 }
1916 return sizeof(pgoff_t) + 1;
1917}
1918
1919int
1920ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1921{
1922 int i;
1923 int off;
1924 int offsetFlg;
1925
1926 /* Initialize to zero */
1927 *o = 0;
1928
1929 /* Check for old version */
1930 if (AH->version < K_VERS_1_7)
1931 {
1932 /* Prior versions wrote offsets using WriteInt */
1933 i = ReadInt(AH);
1934 /* -1 means not set */
1935 if (i < 0)
1936 return K_OFFSET_POS_NOT_SET;
1937 else if (i == 0)
1938 return K_OFFSET_NO_DATA;
1939
1940 /* Cast to pgoff_t because it was written as an int. */
1941 *o = (pgoff_t) i;
1942 return K_OFFSET_POS_SET;
1943 }
1944
1945 /*
1946 * Read the flag indicating the state of the data pointer. Check if valid
1947 * and die if not.
1948 *
1949 * This used to be handled by a negative or zero pointer, now we use an
1950 * extra byte specifically for the state.
1951 */
1952 offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
1953
1954 switch (offsetFlg)
1955 {
1956 case K_OFFSET_POS_NOT_SET:
1957 case K_OFFSET_NO_DATA:
1958 case K_OFFSET_POS_SET:
1959
1960 break;
1961
1962 default:
1963 fatal("unexpected data offset flag %d", offsetFlg);
1964 }
1965
1966 /*
1967 * Read the bytes
1968 */
1969 for (off = 0; off < AH->offSize; off++)
1970 {
1971 if (off < sizeof(pgoff_t))
1972 *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
1973 else
1974 {
1975 if (AH->ReadBytePtr(AH) != 0)
1976 fatal("file offset in dump file is too large");
1977 }
1978 }
1979
1980 return offsetFlg;
1981}
1982
1983size_t
1984WriteInt(ArchiveHandle *AH, int i)
1985{
1986 int b;
1987
1988 /*
1989 * This is a bit yucky, but I don't want to make the binary format very
1990 * dependent on representation, and not knowing much about it, I write out
1991 * a sign byte. If you change this, don't forget to change the file
1992 * version #, and modify readInt to read the new format AS WELL AS the old
1993 * formats.
1994 */
1995
1996 /* SIGN byte */
1997 if (i < 0)
1998 {
1999 AH->WriteBytePtr(AH, 1);
2000 i = -i;
2001 }
2002 else
2003 AH->WriteBytePtr(AH, 0);
2004
2005 for (b = 0; b < AH->intSize; b++)
2006 {
2007 AH->WriteBytePtr(AH, i & 0xFF);
2008 i >>= 8;
2009 }
2010
2011 return AH->intSize + 1;
2012}
2013
2014int
2015ReadInt(ArchiveHandle *AH)
2016{
2017 int res = 0;
2018 int bv,
2019 b;
2020 int sign = 0; /* Default positive */
2021 int bitShift = 0;
2022
2023 if (AH->version > K_VERS_1_0)
2024 /* Read a sign byte */
2025 sign = AH->ReadBytePtr(AH);
2026
2027 for (b = 0; b < AH->intSize; b++)
2028 {
2029 bv = AH->ReadBytePtr(AH) & 0xFF;
2030 if (bv != 0)
2031 res = res + (bv << bitShift);
2032 bitShift += 8;
2033 }
2034
2035 if (sign)
2036 res = -res;
2037
2038 return res;
2039}
2040
2041size_t
2042WriteStr(ArchiveHandle *AH, const char *c)
2043{
2044 size_t res;
2045
2046 if (c)
2047 {
2048 int len = strlen(c);
2049
2050 res = WriteInt(AH, len);
2051 AH->WriteBufPtr(AH, c, len);
2052 res += len;
2053 }
2054 else
2055 res = WriteInt(AH, -1);
2056
2057 return res;
2058}
2059
2060char *
2061ReadStr(ArchiveHandle *AH)
2062{
2063 char *buf;
2064 int l;
2065
2066 l = ReadInt(AH);
2067 if (l < 0)
2068 buf = NULL;
2069 else
2070 {
2071 buf = (char *) pg_malloc(l + 1);
2072 AH->ReadBufPtr(AH, (void *) buf, l);
2073
2074 buf[l] = '\0';
2075 }
2076
2077 return buf;
2078}
2079
2080static int
2081_discoverArchiveFormat(ArchiveHandle *AH)
2082{
2083 FILE *fh;
2084 char sig[6]; /* More than enough */
2085 size_t cnt;
2086 int wantClose = 0;
2087
2088 pg_log_debug("attempting to ascertain archive format");
2089
2090 if (AH->lookahead)
2091 free(AH->lookahead);
2092
2093 AH->lookaheadSize = 512;
2094 AH->lookahead = pg_malloc0(512);
2095 AH->lookaheadLen = 0;
2096 AH->lookaheadPos = 0;
2097
2098 if (AH->fSpec)
2099 {
2100 struct stat st;
2101
2102 wantClose = 1;
2103
2104 /*
2105 * Check if the specified archive is a directory. If so, check if
2106 * there's a "toc.dat" (or "toc.dat.gz") file in it.
2107 */
2108 if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2109 {
2110 char buf[MAXPGPATH];
2111
2112 if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
2113 fatal("directory name too long: \"%s\"",
2114 AH->fSpec);
2115 if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2116 {
2117 AH->format = archDirectory;
2118 return AH->format;
2119 }
2120
2121#ifdef HAVE_LIBZ
2122 if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
2123 fatal("directory name too long: \"%s\"",
2124 AH->fSpec);
2125 if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2126 {
2127 AH->format = archDirectory;
2128 return AH->format;
2129 }
2130#endif
2131 fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2132 AH->fSpec);
2133 fh = NULL; /* keep compiler quiet */
2134 }
2135 else
2136 {
2137 fh = fopen(AH->fSpec, PG_BINARY_R);
2138 if (!fh)
2139 fatal("could not open input file \"%s\": %m", AH->fSpec);
2140 }
2141 }
2142 else
2143 {
2144 fh = stdin;
2145 if (!fh)
2146 fatal("could not open input file: %m");
2147 }
2148
2149 if ((cnt = fread(sig, 1, 5, fh)) != 5)
2150 {
2151 if (ferror(fh))
2152 fatal("could not read input file: %m");
2153 else
2154 fatal("input file is too short (read %lu, expected 5)",
2155 (unsigned long) cnt);
2156 }
2157
2158 /* Save it, just in case we need it later */
2159 memcpy(&AH->lookahead[0], sig, 5);
2160 AH->lookaheadLen = 5;
2161
2162 if (strncmp(sig, "PGDMP", 5) == 0)
2163 {
2164 int byteread;
2165 char vmaj,
2166 vmin,
2167 vrev;
2168
2169 /*
2170 * Finish reading (most of) a custom-format header.
2171 *
2172 * NB: this code must agree with ReadHead().
2173 */
2174 if ((byteread = fgetc(fh)) == EOF)
2175 READ_ERROR_EXIT(fh);
2176
2177 vmaj = byteread;
2178
2179 if ((byteread = fgetc(fh)) == EOF)
2180 READ_ERROR_EXIT(fh);
2181
2182 vmin = byteread;
2183
2184 /* Save these too... */
2185 AH->lookahead[AH->lookaheadLen++] = vmaj;
2186 AH->lookahead[AH->lookaheadLen++] = vmin;
2187
2188 /* Check header version; varies from V1.0 */
2189 if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
2190 {
2191 if ((byteread = fgetc(fh)) == EOF)
2192 READ_ERROR_EXIT(fh);
2193
2194 vrev = byteread;
2195 AH->lookahead[AH->lookaheadLen++] = vrev;
2196 }
2197 else
2198 vrev = 0;
2199
2200 AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
2201
2202 if ((AH->intSize = fgetc(fh)) == EOF)
2203 READ_ERROR_EXIT(fh);
2204 AH->lookahead[AH->lookaheadLen++] = AH->intSize;
2205
2206 if (AH->version >= K_VERS_1_7)
2207 {
2208 if ((AH->offSize = fgetc(fh)) == EOF)
2209 READ_ERROR_EXIT(fh);
2210 AH->lookahead[AH->lookaheadLen++] = AH->offSize;
2211 }
2212 else
2213 AH->offSize = AH->intSize;
2214
2215 if ((byteread = fgetc(fh)) == EOF)
2216 READ_ERROR_EXIT(fh);
2217
2218 AH->format = byteread;
2219 AH->lookahead[AH->lookaheadLen++] = AH->format;
2220 }
2221 else
2222 {
2223 /*
2224 * *Maybe* we have a tar archive format file or a text dump ... So,
2225 * read first 512 byte header...
2226 */
2227 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2228 /* read failure is checked below */
2229 AH->lookaheadLen += cnt;
2230
2231 if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2232 (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2233 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2234 {
2235 /*
2236 * looks like it's probably a text format dump. so suggest they
2237 * try psql
2238 */
2239 fatal("input file appears to be a text format dump. Please use psql.");
2240 }
2241
2242 if (AH->lookaheadLen != 512)
2243 {
2244 if (feof(fh))
2245 fatal("input file does not appear to be a valid archive (too short?)");
2246 else
2247 READ_ERROR_EXIT(fh);
2248 }
2249
2250 if (!isValidTarHeader(AH->lookahead))
2251 fatal("input file does not appear to be a valid archive");
2252
2253 AH->format = archTar;
2254 }
2255
2256 /* If we can't seek, then mark the header as read */
2257 if (fseeko(fh, 0, SEEK_SET) != 0)
2258 {
2259 /*
2260 * NOTE: Formats that use the lookahead buffer can unset this in their
2261 * Init routine.
2262 */
2263 AH->readHeader = 1;
2264 }
2265 else
2266 AH->lookaheadLen = 0; /* Don't bother since we've reset the file */
2267
2268 /* Close the file */
2269 if (wantClose)
2270 if (fclose(fh) != 0)
2271 fatal("could not close input file: %m");
2272
2273 return AH->format;
2274}
2275
2276
2277/*
2278 * Allocate an archive handle
2279 */
2280static ArchiveHandle *
2281_allocAH(const char *FileSpec, const ArchiveFormat fmt,
2282 const int compression, bool dosync, ArchiveMode mode,
2283 SetupWorkerPtrType setupWorkerPtr)
2284{
2285 ArchiveHandle *AH;
2286
2287 pg_log_debug("allocating AH for %s, format %d", FileSpec, fmt);
2288
2289 AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2290
2291 AH->version = K_VERS_SELF;
2292
2293 /* initialize for backwards compatible string processing */
2294 AH->public.encoding = 0; /* PG_SQL_ASCII */
2295 AH->public.std_strings = false;
2296
2297 /* sql error handling */
2298 AH->public.exit_on_error = true;
2299 AH->public.n_errors = 0;
2300
2301 AH->archiveDumpVersion = PG_VERSION;
2302
2303 AH->createDate = time(NULL);
2304
2305 AH->intSize = sizeof(int);
2306 AH->offSize = sizeof(pgoff_t);
2307 if (FileSpec)
2308 {
2309 AH->fSpec = pg_strdup(FileSpec);
2310
2311 /*
2312 * Not used; maybe later....
2313 *
2314 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2315 * i--) if (AH->workDir[i-1] == '/')
2316 */
2317 }
2318 else
2319 AH->fSpec = NULL;
2320
2321 AH->currUser = NULL; /* unknown */
2322 AH->currSchema = NULL; /* ditto */
2323 AH->currTablespace = NULL; /* ditto */
2324 AH->currTableAm = NULL; /* ditto */
2325
2326 AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2327
2328 AH->toc->next = AH->toc;
2329 AH->toc->prev = AH->toc;
2330
2331 AH->mode = mode;
2332 AH->compression = compression;
2333 AH->dosync = dosync;
2334
2335 memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2336
2337 /* Open stdout with no compression for AH output handle */
2338 AH->gzOut = 0;
2339 AH->OF = stdout;
2340
2341 /*
2342 * On Windows, we need to use binary mode to read/write non-text files,
2343 * which include all archive formats as well as compressed plain text.
2344 * Force stdin/stdout into binary mode if that is what we are using.
2345 */
2346#ifdef WIN32
2347 if ((fmt != archNull || compression != 0) &&
2348 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2349 {
2350 if (mode == archModeWrite)
2351 _setmode(fileno(stdout), O_BINARY);
2352 else
2353 _setmode(fileno(stdin), O_BINARY);
2354 }
2355#endif
2356
2357 AH->SetupWorkerPtr = setupWorkerPtr;
2358
2359 if (fmt == archUnknown)
2360 AH->format = _discoverArchiveFormat(AH);
2361 else
2362 AH->format = fmt;
2363
2364 AH->promptPassword = TRI_DEFAULT;
2365
2366 switch (AH->format)
2367 {
2368 case archCustom:
2369 InitArchiveFmt_Custom(AH);
2370 break;
2371
2372 case archNull:
2373 InitArchiveFmt_Null(AH);
2374 break;
2375
2376 case archDirectory:
2377 InitArchiveFmt_Directory(AH);
2378 break;
2379
2380 case archTar:
2381 InitArchiveFmt_Tar(AH);
2382 break;
2383
2384 default:
2385 fatal("unrecognized file format \"%d\"", fmt);
2386 }
2387
2388 return AH;
2389}
2390
2391/*
2392 * Write out all data (tables & blobs)
2393 */
2394void
2395WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2396{
2397 TocEntry *te;
2398
2399 if (pstate && pstate->numWorkers > 1)
2400 {
2401 /*
2402 * In parallel mode, this code runs in the master process. We
2403 * construct an array of candidate TEs, then sort it into decreasing
2404 * size order, then dispatch each TE to a data-transfer worker. By
2405 * dumping larger tables first, we avoid getting into a situation
2406 * where we're down to one job and it's big, losing parallelism.
2407 */
2408 TocEntry **tes;
2409 int ntes;
2410
2411 tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2412 ntes = 0;
2413 for (te = AH->toc->next; te != AH->toc; te = te->next)
2414 {
2415 /* Consider only TEs with dataDumper functions ... */
2416 if (!te->dataDumper)
2417 continue;
2418 /* ... and ignore ones not enabled for dump */
2419 if ((te->reqs & REQ_DATA) == 0)
2420 continue;
2421
2422 tes[ntes++] = te;
2423 }
2424
2425 if (ntes > 1)
2426 qsort((void *) tes, ntes, sizeof(TocEntry *),
2427 TocEntrySizeCompare);
2428
2429 for (int i = 0; i < ntes; i++)
2430 DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2431 mark_dump_job_done, NULL);
2432
2433 pg_free(tes);
2434
2435 /* Now wait for workers to finish. */
2436 WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2437 }
2438 else
2439 {
2440 /* Non-parallel mode: just dump all candidate TEs sequentially. */
2441 for (te = AH->toc->next; te != AH->toc; te = te->next)
2442 {
2443 /* Must have same filter conditions as above */
2444 if (!te->dataDumper)
2445 continue;
2446 if ((te->reqs & REQ_DATA) == 0)
2447 continue;
2448
2449 WriteDataChunksForTocEntry(AH, te);
2450 }
2451 }
2452}
2453
2454
2455/*
2456 * Callback function that's invoked in the master process after a step has
2457 * been parallel dumped.
2458 *
2459 * We don't need to do anything except check for worker failure.
2460 */
2461static void
2462mark_dump_job_done(ArchiveHandle *AH,
2463 TocEntry *te,
2464 int status,
2465 void *callback_data)
2466{
2467 pg_log_info("finished item %d %s %s",
2468 te->dumpId, te->desc, te->tag);
2469
2470 if (status != 0)
2471 fatal("worker process failed: exit code %d",
2472 status);
2473}
2474
2475
2476void
2477WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2478{
2479 StartDataPtrType startPtr;
2480 EndDataPtrType endPtr;
2481
2482 AH->currToc = te;
2483
2484 if (strcmp(te->desc, "BLOBS") == 0)
2485 {
2486 startPtr = AH->StartBlobsPtr;
2487 endPtr = AH->EndBlobsPtr;
2488 }
2489 else
2490 {
2491 startPtr = AH->StartDataPtr;
2492 endPtr = AH->EndDataPtr;
2493 }
2494
2495 if (startPtr != NULL)
2496 (*startPtr) (AH, te);
2497
2498 /*
2499 * The user-provided DataDumper routine needs to call AH->WriteData
2500 */
2501 te->dataDumper((Archive *) AH, te->dataDumperArg);
2502
2503 if (endPtr != NULL)
2504 (*endPtr) (AH, te);
2505
2506 AH->currToc = NULL;
2507}
2508
2509void
2510WriteToc(ArchiveHandle *AH)
2511{
2512 TocEntry *te;
2513 char workbuf[32];
2514 int tocCount;
2515 int i;
2516
2517 /* count entries that will actually be dumped */
2518 tocCount = 0;
2519 for (te = AH->toc->next; te != AH->toc; te = te->next)
2520 {
2521 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2522 tocCount++;
2523 }
2524
2525 /* printf("%d TOC Entries to save\n", tocCount); */
2526
2527 WriteInt(AH, tocCount);
2528
2529 for (te = AH->toc->next; te != AH->toc; te = te->next)
2530 {
2531 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2532 continue;
2533
2534 WriteInt(AH, te->dumpId);
2535 WriteInt(AH, te->dataDumper ? 1 : 0);
2536
2537 /* OID is recorded as a string for historical reasons */
2538 sprintf(workbuf, "%u", te->catalogId.tableoid);
2539 WriteStr(AH, workbuf);
2540 sprintf(workbuf, "%u", te->catalogId.oid);
2541 WriteStr(AH, workbuf);
2542
2543 WriteStr(AH, te->tag);
2544 WriteStr(AH, te->desc);
2545 WriteInt(AH, te->section);
2546 WriteStr(AH, te->defn);
2547 WriteStr(AH, te->dropStmt);
2548 WriteStr(AH, te->copyStmt);
2549 WriteStr(AH, te->namespace);
2550 WriteStr(AH, te->tablespace);
2551 WriteStr(AH, te->tableam);
2552 WriteStr(AH, te->owner);
2553 WriteStr(AH, "false");
2554
2555 /* Dump list of dependencies */
2556 for (i = 0; i < te->nDeps; i++)
2557 {
2558 sprintf(workbuf, "%d", te->dependencies[i]);
2559 WriteStr(AH, workbuf);
2560 }
2561 WriteStr(AH, NULL); /* Terminate List */
2562
2563 if (AH->WriteExtraTocPtr)
2564 AH->WriteExtraTocPtr(AH, te);
2565 }
2566}
2567
2568void
2569ReadToc(ArchiveHandle *AH)
2570{
2571 int i;
2572 char *tmp;
2573 DumpId *deps;
2574 int depIdx;
2575 int depSize;
2576 TocEntry *te;
2577
2578 AH->tocCount = ReadInt(AH);
2579 AH->maxDumpId = 0;
2580
2581 for (i = 0; i < AH->tocCount; i++)
2582 {
2583 te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2584 te->dumpId = ReadInt(AH);
2585
2586 if (te->dumpId > AH->maxDumpId)
2587 AH->maxDumpId = te->dumpId;
2588
2589 /* Sanity check */
2590 if (te->dumpId <= 0)
2591 fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2592 te->dumpId);
2593
2594 te->hadDumper = ReadInt(AH);
2595
2596 if (AH->version >= K_VERS_1_8)
2597 {
2598 tmp = ReadStr(AH);
2599 sscanf(tmp, "%u", &te->catalogId.tableoid);
2600 free(tmp);
2601 }
2602 else
2603 te->catalogId.tableoid = InvalidOid;
2604 tmp = ReadStr(AH);
2605 sscanf(tmp, "%u", &te->catalogId.oid);
2606 free(tmp);
2607
2608 te->tag = ReadStr(AH);
2609 te->desc = ReadStr(AH);
2610
2611 if (AH->version >= K_VERS_1_11)
2612 {
2613 te->section = ReadInt(AH);
2614 }
2615 else
2616 {
2617 /*
2618 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2619 * the entries into sections. This list need not cover entry
2620 * types added later than 8.4.
2621 */
2622 if (strcmp(te->desc, "COMMENT") == 0 ||
2623 strcmp(te->desc, "ACL") == 0 ||
2624 strcmp(te->desc, "ACL LANGUAGE") == 0)
2625 te->section = SECTION_NONE;
2626 else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2627 strcmp(te->desc, "BLOBS") == 0 ||
2628 strcmp(te->desc, "BLOB COMMENTS") == 0)
2629 te->section = SECTION_DATA;
2630 else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2631 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2632 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2633 strcmp(te->desc, "INDEX") == 0 ||
2634 strcmp(te->desc, "RULE") == 0 ||
2635 strcmp(te->desc, "TRIGGER") == 0)
2636 te->section = SECTION_POST_DATA;
2637 else
2638 te->section = SECTION_PRE_DATA;
2639 }
2640
2641 te->defn = ReadStr(AH);
2642 te->dropStmt = ReadStr(AH);
2643
2644 if (AH->version >= K_VERS_1_3)
2645 te->copyStmt = ReadStr(AH);
2646
2647 if (AH->version >= K_VERS_1_6)
2648 te->namespace = ReadStr(AH);
2649
2650 if (AH->version >= K_VERS_1_10)
2651 te->tablespace = ReadStr(AH);
2652
2653 if (AH->version >= K_VERS_1_14)
2654 te->tableam = ReadStr(AH);
2655
2656 te->owner = ReadStr(AH);
2657 if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0)
2658 pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2659
2660 /* Read TOC entry dependencies */
2661 if (AH->version >= K_VERS_1_5)
2662 {
2663 depSize = 100;
2664 deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2665 depIdx = 0;
2666 for (;;)
2667 {
2668 tmp = ReadStr(AH);
2669 if (!tmp)
2670 break; /* end of list */
2671 if (depIdx >= depSize)
2672 {
2673 depSize *= 2;
2674 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2675 }
2676 sscanf(tmp, "%d", &deps[depIdx]);
2677 free(tmp);
2678 depIdx++;
2679 }
2680
2681 if (depIdx > 0) /* We have a non-null entry */
2682 {
2683 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2684 te->dependencies = deps;
2685 te->nDeps = depIdx;
2686 }
2687 else
2688 {
2689 free(deps);
2690 te->dependencies = NULL;
2691 te->nDeps = 0;
2692 }
2693 }
2694 else
2695 {
2696 te->dependencies = NULL;
2697 te->nDeps = 0;
2698 }
2699 te->dataLength = 0;
2700
2701 if (AH->ReadExtraTocPtr)
2702 AH->ReadExtraTocPtr(AH, te);
2703
2704 pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2705 i, te->dumpId, te->desc, te->tag);
2706
2707 /* link completed entry into TOC circular list */
2708 te->prev = AH->toc->prev;
2709 AH->toc->prev->next = te;
2710 AH->toc->prev = te;
2711 te->next = AH->toc;
2712
2713 /* special processing immediately upon read for some items */
2714 if (strcmp(te->desc, "ENCODING") == 0)
2715 processEncodingEntry(AH, te);
2716 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2717 processStdStringsEntry(AH, te);
2718 else if (strcmp(te->desc, "SEARCHPATH") == 0)
2719 processSearchPathEntry(AH, te);
2720 }
2721}
2722
2723static void
2724processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2725{
2726 /* te->defn should have the form SET client_encoding = 'foo'; */
2727 char *defn = pg_strdup(te->defn);
2728 char *ptr1;
2729 char *ptr2 = NULL;
2730 int encoding;
2731
2732 ptr1 = strchr(defn, '\'');
2733 if (ptr1)
2734 ptr2 = strchr(++ptr1, '\'');
2735 if (ptr2)
2736 {
2737 *ptr2 = '\0';
2738 encoding = pg_char_to_encoding(ptr1);
2739 if (encoding < 0)
2740 fatal("unrecognized encoding \"%s\"",
2741 ptr1);
2742 AH->public.encoding = encoding;
2743 }
2744 else
2745 fatal("invalid ENCODING item: %s",
2746 te->defn);
2747
2748 free(defn);
2749}
2750
2751static void
2752processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2753{
2754 /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2755 char *ptr1;
2756
2757 ptr1 = strchr(te->defn, '\'');
2758 if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2759 AH->public.std_strings = true;
2760 else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2761 AH->public.std_strings = false;
2762 else
2763 fatal("invalid STDSTRINGS item: %s",
2764 te->defn);
2765}
2766
2767static void
2768processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2769{
2770 /*
2771 * te->defn should contain a command to set search_path. We just copy it
2772 * verbatim for use later.
2773 */
2774 AH->public.searchpath = pg_strdup(te->defn);
2775}
2776
2777static void
2778StrictNamesCheck(RestoreOptions *ropt)
2779{
2780 const char *missing_name;
2781
2782 Assert(ropt->strict_names);
2783
2784 if (ropt->schemaNames.head != NULL)
2785 {
2786 missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2787 if (missing_name != NULL)
2788 fatal("schema \"%s\" not found", missing_name);
2789 }
2790
2791 if (ropt->tableNames.head != NULL)
2792 {
2793 missing_name = simple_string_list_not_touched(&ropt->tableNames);
2794 if (missing_name != NULL)
2795 fatal("table \"%s\" not found", missing_name);
2796 }
2797
2798 if (ropt->indexNames.head != NULL)
2799 {
2800 missing_name = simple_string_list_not_touched(&ropt->indexNames);
2801 if (missing_name != NULL)
2802 fatal("index \"%s\" not found", missing_name);
2803 }
2804
2805 if (ropt->functionNames.head != NULL)
2806 {
2807 missing_name = simple_string_list_not_touched(&ropt->functionNames);
2808 if (missing_name != NULL)
2809 fatal("function \"%s\" not found", missing_name);
2810 }
2811
2812 if (ropt->triggerNames.head != NULL)
2813 {
2814 missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2815 if (missing_name != NULL)
2816 fatal("trigger \"%s\" not found", missing_name);
2817 }
2818}
2819
2820/*
2821 * Determine whether we want to restore this TOC entry.
2822 *
2823 * Returns 0 if entry should be skipped, or some combination of the
2824 * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2825 * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2826 */
2827static teReqs
2828_tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2829{
2830 teReqs res = REQ_SCHEMA | REQ_DATA;
2831 RestoreOptions *ropt = AH->public.ropt;
2832
2833 /* These items are treated specially */
2834 if (strcmp(te->desc, "ENCODING") == 0 ||
2835 strcmp(te->desc, "STDSTRINGS") == 0 ||
2836 strcmp(te->desc, "SEARCHPATH") == 0)
2837 return REQ_SPECIAL;
2838
2839 /*
2840 * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2841 * restored in createDB mode, and not restored otherwise, independently of
2842 * all else.
2843 */
2844 if (strcmp(te->desc, "DATABASE") == 0 ||
2845 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2846 {
2847 if (ropt->createDB)
2848 return REQ_SCHEMA;
2849 else
2850 return 0;
2851 }
2852
2853 /*
2854 * Process exclusions that affect certain classes of TOC entries.
2855 */
2856
2857 /* If it's an ACL, maybe ignore it */
2858 if (ropt->aclsSkip && _tocEntryIsACL(te))
2859 return 0;
2860
2861 /* If it's a comment, maybe ignore it */
2862 if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2863 return 0;
2864
2865 /*
2866 * If it's a publication or a table part of a publication, maybe ignore
2867 * it.
2868 */
2869 if (ropt->no_publications &&
2870 (strcmp(te->desc, "PUBLICATION") == 0 ||
2871 strcmp(te->desc, "PUBLICATION TABLE") == 0))
2872 return 0;
2873
2874 /* If it's a security label, maybe ignore it */
2875 if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2876 return 0;
2877
2878 /* If it's a subscription, maybe ignore it */
2879 if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2880 return 0;
2881
2882 /* Ignore it if section is not to be dumped/restored */
2883 switch (curSection)
2884 {
2885 case SECTION_PRE_DATA:
2886 if (!(ropt->dumpSections & DUMP_PRE_DATA))
2887 return 0;
2888 break;
2889 case SECTION_DATA:
2890 if (!(ropt->dumpSections & DUMP_DATA))
2891 return 0;
2892 break;
2893 case SECTION_POST_DATA:
2894 if (!(ropt->dumpSections & DUMP_POST_DATA))
2895 return 0;
2896 break;
2897 default:
2898 /* shouldn't get here, really, but ignore it */
2899 return 0;
2900 }
2901
2902 /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2903 if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2904 return 0;
2905
2906 /*
2907 * Check options for selective dump/restore.
2908 */
2909 if (strcmp(te->desc, "ACL") == 0 ||
2910 strcmp(te->desc, "COMMENT") == 0 ||
2911 strcmp(te->desc, "SECURITY LABEL") == 0)
2912 {
2913 /* Database properties react to createDB, not selectivity options. */
2914 if (strncmp(te->tag, "DATABASE ", 9) == 0)
2915 {
2916 if (!ropt->createDB)
2917 return 0;
2918 }
2919 else if (ropt->schemaNames.head != NULL ||
2920 ropt->schemaExcludeNames.head != NULL ||
2921 ropt->selTypes)
2922 {
2923 /*
2924 * In a selective dump/restore, we want to restore these dependent
2925 * TOC entry types only if their parent object is being restored.
2926 * Without selectivity options, we let through everything in the
2927 * archive. Note there may be such entries with no parent, eg
2928 * non-default ACLs for built-in objects.
2929 *
2930 * This code depends on the parent having been marked already,
2931 * which should be the case; if it isn't, perhaps due to
2932 * SortTocFromFile rearrangement, skipping the dependent entry
2933 * seems prudent anyway.
2934 *
2935 * Ideally we'd handle, eg, table CHECK constraints this way too.
2936 * But it's hard to tell which of their dependencies is the one to
2937 * consult.
2938 */
2939 if (te->nDeps != 1 ||
2940 TocIDRequired(AH, te->dependencies[0]) == 0)
2941 return 0;
2942 }
2943 }
2944 else
2945 {
2946 /* Apply selective-restore rules for standalone TOC entries. */
2947 if (ropt->schemaNames.head != NULL)
2948 {
2949 /* If no namespace is specified, it means all. */
2950 if (!te->namespace)
2951 return 0;
2952 if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
2953 return 0;
2954 }
2955
2956 if (ropt->schemaExcludeNames.head != NULL &&
2957 te->namespace &&
2958 simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
2959 return 0;
2960
2961 if (ropt->selTypes)
2962 {
2963 if (strcmp(te->desc, "TABLE") == 0 ||
2964 strcmp(te->desc, "TABLE DATA") == 0 ||
2965 strcmp(te->desc, "VIEW") == 0 ||
2966 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2967 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2968 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2969 strcmp(te->desc, "SEQUENCE") == 0 ||
2970 strcmp(te->desc, "SEQUENCE SET") == 0)
2971 {
2972 if (!ropt->selTable)
2973 return 0;
2974 if (ropt->tableNames.head != NULL &&
2975 !simple_string_list_member(&ropt->tableNames, te->tag))
2976 return 0;
2977 }
2978 else if (strcmp(te->desc, "INDEX") == 0)
2979 {
2980 if (!ropt->selIndex)
2981 return 0;
2982 if (ropt->indexNames.head != NULL &&
2983 !simple_string_list_member(&ropt->indexNames, te->tag))
2984 return 0;
2985 }
2986 else if (strcmp(te->desc, "FUNCTION") == 0 ||
2987 strcmp(te->desc, "AGGREGATE") == 0 ||
2988 strcmp(te->desc, "PROCEDURE") == 0)
2989 {
2990 if (!ropt->selFunction)
2991 return 0;
2992 if (ropt->functionNames.head != NULL &&
2993 !simple_string_list_member(&ropt->functionNames, te->tag))
2994 return 0;
2995 }
2996 else if (strcmp(te->desc, "TRIGGER") == 0)
2997 {
2998 if (!ropt->selTrigger)
2999 return 0;
3000 if (ropt->triggerNames.head != NULL &&
3001 !simple_string_list_member(&ropt->triggerNames, te->tag))
3002 return 0;
3003 }
3004 else
3005 return 0;
3006 }
3007 }
3008
3009 /*
3010 * Determine whether the TOC entry contains schema and/or data components,
3011 * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3012 * it's both schema and data. Otherwise it's probably schema-only, but
3013 * there are exceptions.
3014 */
3015 if (!te->hadDumper)
3016 {
3017 /*
3018 * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
3019 * it is considered a data entry. We don't need to check for the
3020 * BLOBS entry or old-style BLOB COMMENTS, because they will have
3021 * hadDumper = true ... but we do need to check new-style BLOB ACLs,
3022 * comments, etc.
3023 */
3024 if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3025 strcmp(te->desc, "BLOB") == 0 ||
3026 (strcmp(te->desc, "ACL") == 0 &&
3027 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3028 (strcmp(te->desc, "COMMENT") == 0 &&
3029 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3030 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3031 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
3032 res = res & REQ_DATA;
3033 else
3034 res = res & ~REQ_DATA;
3035 }
3036
3037 /* If there's no definition command, there's no schema component */
3038 if (!te->defn || !te->defn[0])
3039 res = res & ~REQ_SCHEMA;
3040
3041 /*
3042 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3043 * always ignore it.
3044 */
3045 if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3046 return 0;
3047
3048 /* Mask it if we only want schema */
3049 if (ropt->schemaOnly)
3050 {
3051 /*
3052 * The sequence_data option overrides schemaOnly for SEQUENCE SET.
3053 *
3054 * In binary-upgrade mode, even with schemaOnly set, we do not mask
3055 * out large objects. (Only large object definitions, comments and
3056 * other metadata should be generated in binary-upgrade mode, not the
3057 * actual data, but that need not concern us here.)
3058 */
3059 if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3060 !(ropt->binary_upgrade &&
3061 (strcmp(te->desc, "BLOB") == 0 ||
3062 (strcmp(te->desc, "ACL") == 0 &&
3063 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3064 (strcmp(te->desc, "COMMENT") == 0 &&
3065 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3066 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3067 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
3068 res = res & REQ_SCHEMA;
3069 }
3070
3071 /* Mask it if we only want data */
3072 if (ropt->dataOnly)
3073 res = res & REQ_DATA;
3074
3075 return res;
3076}
3077
3078/*
3079 * Identify which pass we should restore this TOC entry in.
3080 *
3081 * See notes with the RestorePass typedef in pg_backup_archiver.h.
3082 */
3083static RestorePass
3084_tocEntryRestorePass(TocEntry *te)
3085{
3086 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3087 if (strcmp(te->desc, "ACL") == 0 ||
3088 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3089 strcmp(te->desc, "DEFAULT ACL") == 0)
3090 return RESTORE_PASS_ACL;
3091 if (strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3092 return RESTORE_PASS_REFRESH;
3093 return RESTORE_PASS_MAIN;
3094}
3095
3096/*
3097 * Identify TOC entries that are ACLs.
3098 *
3099 * Note: it seems worth duplicating some code here to avoid a hard-wired
3100 * assumption that these are exactly the same entries that we restore during
3101 * the RESTORE_PASS_ACL phase.
3102 */
3103static bool
3104_tocEntryIsACL(TocEntry *te)
3105{
3106 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3107 if (strcmp(te->desc, "ACL") == 0 ||
3108 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3109 strcmp(te->desc, "DEFAULT ACL") == 0)
3110 return true;
3111 return false;
3112}
3113
3114/*
3115 * Issue SET commands for parameters that we want to have set the same way
3116 * at all times during execution of a restore script.
3117 */
3118static void
3119_doSetFixedOutputState(ArchiveHandle *AH)
3120{
3121 RestoreOptions *ropt = AH->public.ropt;
3122
3123 /*
3124 * Disable timeouts to allow for slow commands, idle parallel workers, etc
3125 */
3126 ahprintf(AH, "SET statement_timeout = 0;\n");
3127 ahprintf(AH, "SET lock_timeout = 0;\n");
3128 ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3129
3130 /* Select the correct character set encoding */
3131 ahprintf(AH, "SET client_encoding = '%s';\n",
3132 pg_encoding_to_char(AH->public.encoding));
3133
3134 /* Select the correct string literal syntax */
3135 ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3136 AH->public.std_strings ? "on" : "off");
3137
3138 /* Select the role to be used during restore */
3139 if (ropt && ropt->use_role)
3140 ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3141
3142 /* Select the dump-time search_path */
3143 if (AH->public.searchpath)
3144 ahprintf(AH, "%s", AH->public.searchpath);
3145
3146 /* Make sure function checking is disabled */
3147 ahprintf(AH, "SET check_function_bodies = false;\n");
3148
3149 /* Ensure that all valid XML data will be accepted */
3150 ahprintf(AH, "SET xmloption = content;\n");
3151
3152 /* Avoid annoying notices etc */
3153 ahprintf(AH, "SET client_min_messages = warning;\n");
3154 if (!AH->public.std_strings)
3155 ahprintf(AH, "SET escape_string_warning = off;\n");
3156
3157 /* Adjust row-security state */
3158 if (ropt && ropt->enable_row_security)
3159 ahprintf(AH, "SET row_security = on;\n");
3160 else
3161 ahprintf(AH, "SET row_security = off;\n");
3162
3163 ahprintf(AH, "\n");
3164}
3165
3166/*
3167 * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3168 * for updating state if appropriate. If user is NULL or an empty string,
3169 * the specification DEFAULT will be used.
3170 */
3171static void
3172_doSetSessionAuth(ArchiveHandle *AH, const char *user)
3173{
3174 PQExpBuffer cmd = createPQExpBuffer();
3175
3176 appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3177
3178 /*
3179 * SQL requires a string literal here. Might as well be correct.
3180 */
3181 if (user && *user)
3182 appendStringLiteralAHX(cmd, user, AH);
3183 else
3184 appendPQExpBufferStr(cmd, "DEFAULT");
3185 appendPQExpBufferChar(cmd, ';');
3186
3187 if (RestoringToDB(AH))
3188 {
3189 PGresult *res;
3190
3191 res = PQexec(AH->connection, cmd->data);
3192
3193 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3194 /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3195 fatal("could not set session user to \"%s\": %s",
3196 user, PQerrorMessage(AH->connection));
3197
3198 PQclear(res);
3199 }
3200 else
3201 ahprintf(AH, "%s\n\n", cmd->data);
3202
3203 destroyPQExpBuffer(cmd);
3204}
3205
3206
3207/*
3208 * Issue the commands to connect to the specified database.
3209 *
3210 * If we're currently restoring right into a database, this will
3211 * actually establish a connection. Otherwise it puts a \connect into
3212 * the script output.
3213 *
3214 * NULL dbname implies reconnecting to the current DB (pretty useless).
3215 */
3216static void
3217_reconnectToDB(ArchiveHandle *AH, const char *dbname)
3218{
3219 if (RestoringToDB(AH))
3220 ReconnectToServer(AH, dbname, NULL);
3221 else
3222 {
3223 if (dbname)
3224 {
3225 PQExpBufferData connectbuf;
3226
3227 initPQExpBuffer(&connectbuf);
3228 appendPsqlMetaConnect(&connectbuf, dbname);
3229 ahprintf(AH, "%s\n", connectbuf.data);
3230 termPQExpBuffer(&connectbuf);
3231 }
3232 else
3233 ahprintf(AH, "%s\n", "\\connect -\n");
3234 }
3235
3236 /*
3237 * NOTE: currUser keeps track of what the imaginary session user in our
3238 * script is. It's now effectively reset to the original userID.
3239 */
3240 if (AH->currUser)
3241 free(AH->currUser);
3242 AH->currUser = NULL;
3243
3244 /* don't assume we still know the output schema, tablespace, etc either */
3245 if (AH->currSchema)
3246 free(AH->currSchema);
3247 AH->currSchema = NULL;
3248 if (AH->currTablespace)
3249 free(AH->currTablespace);
3250 AH->currTablespace = NULL;
3251
3252 /* re-establish fixed state */
3253 _doSetFixedOutputState(AH);
3254}
3255
3256/*
3257 * Become the specified user, and update state to avoid redundant commands
3258 *
3259 * NULL or empty argument is taken to mean restoring the session default
3260 */
3261static void
3262_becomeUser(ArchiveHandle *AH, const char *user)
3263{
3264 if (!user)
3265 user = ""; /* avoid null pointers */
3266
3267 if (AH->currUser && strcmp(AH->currUser, user) == 0)
3268 return; /* no need to do anything */
3269
3270 _doSetSessionAuth(AH, user);
3271
3272 /*
3273 * NOTE: currUser keeps track of what the imaginary session user in our
3274 * script is
3275 */
3276 if (AH->currUser)
3277 free(AH->currUser);
3278 AH->currUser = pg_strdup(user);
3279}
3280
3281/*
3282 * Become the owner of the given TOC entry object. If
3283 * changes in ownership are not allowed, this doesn't do anything.
3284 */
3285static void
3286_becomeOwner(ArchiveHandle *AH, TocEntry *te)
3287{
3288 RestoreOptions *ropt = AH->public.ropt;
3289
3290 if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3291 return;
3292
3293 _becomeUser(AH, te->owner);
3294}
3295
3296
3297/*
3298 * Issue the commands to select the specified schema as the current schema
3299 * in the target database.
3300 */
3301static void
3302_selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3303{
3304 PQExpBuffer qry;
3305
3306 /*
3307 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3308 * that search_path rather than switching to entry-specific paths.
3309 * Otherwise, it's an old archive that will not restore correctly unless
3310 * we set the search_path as it's expecting.
3311 */
3312 if (AH->public.searchpath)
3313 return;
3314
3315 if (!schemaName || *schemaName == '\0' ||
3316 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3317 return; /* no need to do anything */
3318
3319 qry = createPQExpBuffer();
3320
3321 appendPQExpBuffer(qry, "SET search_path = %s",
3322 fmtId(schemaName));
3323 if (strcmp(schemaName, "pg_catalog") != 0)
3324 appendPQExpBufferStr(qry, ", pg_catalog");
3325
3326 if (RestoringToDB(AH))
3327 {
3328 PGresult *res;
3329
3330 res = PQexec(AH->connection, qry->data);
3331
3332 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3333 warn_or_exit_horribly(AH,
3334 "could not set search_path to \"%s\": %s",
3335 schemaName, PQerrorMessage(AH->connection));
3336
3337 PQclear(res);
3338 }
3339 else
3340 ahprintf(AH, "%s;\n\n", qry->data);
3341
3342 if (AH->currSchema)
3343 free(AH->currSchema);
3344 AH->currSchema = pg_strdup(schemaName);
3345
3346 destroyPQExpBuffer(qry);
3347}
3348
3349/*
3350 * Issue the commands to select the specified tablespace as the current one
3351 * in the target database.
3352 */
3353static void
3354_selectTablespace(ArchiveHandle *AH, const char *tablespace)
3355{
3356 RestoreOptions *ropt = AH->public.ropt;
3357 PQExpBuffer qry;
3358 const char *want,
3359 *have;
3360
3361 /* do nothing in --no-tablespaces mode */
3362 if (ropt->noTablespace)
3363 return;
3364
3365 have = AH->currTablespace;
3366 want = tablespace;
3367
3368 /* no need to do anything for non-tablespace object */
3369 if (!want)
3370 return;
3371
3372 if (have && strcmp(want, have) == 0)
3373 return; /* no need to do anything */
3374
3375 qry = createPQExpBuffer();
3376
3377 if (strcmp(want, "") == 0)
3378 {
3379 /* We want the tablespace to be the database's default */
3380 appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3381 }
3382 else
3383 {
3384 /* We want an explicit tablespace */
3385 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3386 }
3387
3388 if (RestoringToDB(AH))
3389 {
3390 PGresult *res;
3391
3392 res = PQexec(AH->connection, qry->data);
3393
3394 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3395 warn_or_exit_horribly(AH,
3396 "could not set default_tablespace to %s: %s",
3397 fmtId(want), PQerrorMessage(AH->connection));
3398
3399 PQclear(res);
3400 }
3401 else
3402 ahprintf(AH, "%s;\n\n", qry->data);
3403
3404 if (AH->currTablespace)
3405 free(AH->currTablespace);
3406 AH->currTablespace = pg_strdup(want);
3407
3408 destroyPQExpBuffer(qry);
3409}
3410
3411/*
3412 * Set the proper default_table_access_method value for the table.
3413 */
3414static void
3415_selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3416{
3417 PQExpBuffer cmd;
3418 const char *want,
3419 *have;
3420
3421 have = AH->currTableAm;
3422 want = tableam;
3423
3424 if (!want)
3425 return;
3426
3427 if (have && strcmp(want, have) == 0)
3428 return;
3429
3430 cmd = createPQExpBuffer();
3431 appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3432
3433 if (RestoringToDB(AH))
3434 {
3435 PGresult *res;
3436
3437 res = PQexec(AH->connection, cmd->data);
3438
3439 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3440 warn_or_exit_horribly(AH,
3441 "could not set default_table_access_method: %s",
3442 PQerrorMessage(AH->connection));
3443
3444 PQclear(res);
3445 }
3446 else
3447 ahprintf(AH, "%s\n\n", cmd->data);
3448
3449 destroyPQExpBuffer(cmd);
3450
3451 AH->currTableAm = pg_strdup(want);
3452}
3453
3454/*
3455 * Extract an object description for a TOC entry, and append it to buf.
3456 *
3457 * This is used for ALTER ... OWNER TO.
3458 */
3459static void
3460_getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
3461{
3462 const char *type = te->desc;
3463
3464 /* Use ALTER TABLE for views and sequences */
3465 if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
3466 strcmp(type, "MATERIALIZED VIEW") == 0)
3467 type = "TABLE";
3468
3469 /* objects that don't require special decoration */
3470 if (strcmp(type, "COLLATION") == 0 ||
3471 strcmp(type, "CONVERSION") == 0 ||
3472 strcmp(type, "DOMAIN") == 0 ||
3473 strcmp(type, "TABLE") == 0 ||
3474 strcmp(type, "TYPE") == 0 ||
3475 strcmp(type, "FOREIGN TABLE") == 0 ||
3476 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3477 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3478 strcmp(type, "STATISTICS") == 0 ||
3479 /* non-schema-specified objects */
3480 strcmp(type, "DATABASE") == 0 ||
3481 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3482 strcmp(type, "SCHEMA") == 0 ||
3483 strcmp(type, "EVENT TRIGGER") == 0 ||
3484 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3485 strcmp(type, "SERVER") == 0 ||
3486 strcmp(type, "PUBLICATION") == 0 ||
3487 strcmp(type, "SUBSCRIPTION") == 0 ||
3488 strcmp(type, "USER MAPPING") == 0)
3489 {
3490 appendPQExpBuffer(buf, "%s ", type);
3491 if (te->namespace && *te->namespace)
3492 appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3493 appendPQExpBufferStr(buf, fmtId(te->tag));
3494 return;
3495 }
3496
3497 /* BLOBs just have a name, but it's numeric so must not use fmtId */
3498 if (strcmp(type, "BLOB") == 0)
3499 {
3500 appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3501 return;
3502 }
3503
3504 /*
3505 * These object types require additional decoration. Fortunately, the
3506 * information needed is exactly what's in the DROP command.
3507 */
3508 if (strcmp(type, "AGGREGATE") == 0 ||
3509 strcmp(type, "FUNCTION") == 0 ||
3510 strcmp(type, "OPERATOR") == 0 ||
3511 strcmp(type, "OPERATOR CLASS") == 0 ||
3512 strcmp(type, "OPERATOR FAMILY") == 0 ||
3513 strcmp(type, "PROCEDURE") == 0)
3514 {
3515 /* Chop "DROP " off the front and make a modifiable copy */
3516 char *first = pg_strdup(te->dropStmt + 5);
3517 char *last;
3518
3519 /* point to last character in string */
3520 last = first + strlen(first) - 1;
3521
3522 /* Strip off any ';' or '\n' at the end */
3523 while (last >= first && (*last == '\n' || *last == ';'))
3524 last--;
3525 *(last + 1) = '\0';
3526
3527 appendPQExpBufferStr(buf, first);
3528
3529 free(first);
3530 return;
3531 }
3532
3533 pg_log_warning("don't know how to set owner for object type \"%s\"",
3534 type);
3535}
3536
3537/*
3538 * Emit the SQL commands to create the object represented by a TOC entry
3539 *
3540 * This now also includes issuing an ALTER OWNER command to restore the
3541 * object's ownership, if wanted. But note that the object's permissions
3542 * will remain at default, until the matching ACL TOC entry is restored.
3543 */
3544static void
3545_printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3546{
3547 RestoreOptions *ropt = AH->public.ropt;
3548
3549 /* Select owner, schema, tablespace and default AM as necessary */
3550 _becomeOwner(AH, te);
3551 _selectOutputSchema(AH, te->namespace);
3552 _selectTablespace(AH, te->tablespace);
3553 _selectTableAccessMethod(AH, te->tableam);
3554
3555 /* Emit header comment for item */
3556 if (!AH->noTocComments)
3557 {
3558 const char *pfx;
3559 char *sanitized_name;
3560 char *sanitized_schema;
3561 char *sanitized_owner;
3562
3563 if (isData)
3564 pfx = "Data for ";
3565 else
3566 pfx = "";
3567
3568 ahprintf(AH, "--\n");
3569 if (AH->public.verbose)
3570 {
3571 ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3572 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3573 if (te->nDeps > 0)
3574 {
3575 int i;
3576
3577 ahprintf(AH, "-- Dependencies:");
3578 for (i = 0; i < te->nDeps; i++)
3579 ahprintf(AH, " %d", te->dependencies[i]);
3580 ahprintf(AH, "\n");
3581 }
3582 }
3583
3584 sanitized_name = sanitize_line(te->tag, false);
3585 sanitized_schema = sanitize_line(te->namespace, true);
3586 sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3587
3588 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3589 pfx, sanitized_name, te->desc, sanitized_schema,
3590 sanitized_owner);
3591
3592 free(sanitized_name);
3593 free(sanitized_schema);
3594 free(sanitized_owner);
3595
3596 if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3597 {
3598 char *sanitized_tablespace;
3599
3600 sanitized_tablespace = sanitize_line(te->tablespace, false);
3601 ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3602 free(sanitized_tablespace);
3603 }
3604 ahprintf(AH, "\n");
3605
3606 if (AH->PrintExtraTocPtr != NULL)
3607 AH->PrintExtraTocPtr(AH, te);
3608 ahprintf(AH, "--\n\n");
3609 }
3610
3611 /*
3612 * Actually print the definition.
3613 *
3614 * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3615 * versions put into CREATE SCHEMA. We have to do this when --no-owner
3616 * mode is selected. This is ugly, but I see no other good way ...
3617 */
3618 if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
3619 {
3620 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3621 }
3622 else
3623 {
3624 if (te->defn && strlen(te->defn) > 0)
3625 ahprintf(AH, "%s\n\n", te->defn);
3626 }
3627
3628 /*
3629 * If we aren't using SET SESSION AUTH to determine ownership, we must
3630 * instead issue an ALTER OWNER command. We assume that anything without
3631 * a DROP command is not a separately ownable object. All the categories
3632 * with DROP commands must appear in one list or the other.
3633 */
3634 if (!ropt->noOwner && !ropt->use_setsessauth &&
3635 te->owner && strlen(te->owner) > 0 &&
3636 te->dropStmt && strlen(te->dropStmt) > 0)
3637 {
3638 if (strcmp(te->desc, "AGGREGATE") == 0 ||
3639 strcmp(te->desc, "BLOB") == 0 ||
3640 strcmp(te->desc, "COLLATION") == 0 ||
3641 strcmp(te->desc, "CONVERSION") == 0 ||
3642 strcmp(te->desc, "DATABASE") == 0 ||
3643 strcmp(te->desc, "DOMAIN") == 0 ||
3644 strcmp(te->desc, "FUNCTION") == 0 ||
3645 strcmp(te->desc, "OPERATOR") == 0 ||
3646 strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3647 strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3648 strcmp(te->desc, "PROCEDURE") == 0 ||
3649 strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3650 strcmp(te->desc, "SCHEMA") == 0 ||
3651 strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3652 strcmp(te->desc, "TABLE") == 0 ||
3653 strcmp(te->desc, "TYPE") == 0 ||
3654 strcmp(te->desc, "VIEW") == 0 ||
3655 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3656 strcmp(te->desc, "SEQUENCE") == 0 ||
3657 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3658 strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3659 strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
3660 strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
3661 strcmp(te->desc, "SERVER") == 0 ||
3662 strcmp(te->desc, "STATISTICS") == 0 ||
3663 strcmp(te->desc, "PUBLICATION") == 0 ||
3664 strcmp(te->desc, "SUBSCRIPTION") == 0)
3665 {
3666 PQExpBuffer temp = createPQExpBuffer();
3667
3668 appendPQExpBufferStr(temp, "ALTER ");
3669 _getObjectDescription(temp, te, AH);
3670 appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
3671 ahprintf(AH, "%s\n\n", temp->data);
3672 destroyPQExpBuffer(temp);
3673 }
3674 else if (strcmp(te->desc, "CAST") == 0 ||
3675 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3676 strcmp(te->desc, "CONSTRAINT") == 0 ||
3677 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
3678 strcmp(te->desc, "DEFAULT") == 0 ||
3679 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3680 strcmp(te->desc, "INDEX") == 0 ||
3681 strcmp(te->desc, "RULE") == 0 ||
3682 strcmp(te->desc, "TRIGGER") == 0 ||
3683 strcmp(te->desc, "ROW SECURITY") == 0 ||
3684 strcmp(te->desc, "POLICY") == 0 ||
3685 strcmp(te->desc, "USER MAPPING") == 0)
3686 {
3687 /* these object types don't have separate owners */
3688 }
3689 else
3690 {
3691 pg_log_warning("don't know how to set owner for object type \"%s\"",
3692 te->desc);
3693 }
3694 }
3695
3696 /*
3697 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3698 * commands, so we can no longer assume we know the current auth setting.
3699 */
3700 if (_tocEntryIsACL(te))
3701 {
3702 if (AH->currUser)
3703 free(AH->currUser);
3704 AH->currUser = NULL;
3705 }
3706}
3707
3708/*
3709 * Sanitize a string to be included in an SQL comment or TOC listing, by
3710 * replacing any newlines with spaces. This ensures each logical output line
3711 * is in fact one physical output line, to prevent corruption of the dump
3712 * (which could, in the worst case, present an SQL injection vulnerability
3713 * if someone were to incautiously load a dump containing objects with
3714 * maliciously crafted names).
3715 *
3716 * The result is a freshly malloc'd string. If the input string is NULL,
3717 * return a malloc'ed empty string, unless want_hyphen, in which case return a
3718 * malloc'ed hyphen.
3719 *
3720 * Note that we currently don't bother to quote names, meaning that the name
3721 * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3722 * it only examines the dumpId field, but someday we might want to try harder.
3723 */
3724static char *
3725sanitize_line(const char *str, bool want_hyphen)
3726{
3727 char *result;
3728 char *s;
3729
3730 if (!str)
3731 return pg_strdup(want_hyphen ? "-" : "");
3732
3733 result = pg_strdup(str);
3734
3735 for (s = result; *s != '\0'; s++)
3736 {
3737 if (*s == '\n' || *s == '\r')
3738 *s = ' ';
3739 }
3740
3741 return result;
3742}
3743
3744/*
3745 * Write the file header for a custom-format archive
3746 */
3747void
3748WriteHead(ArchiveHandle *AH)
3749{
3750 struct tm crtm;
3751
3752 AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3753 AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3754 AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3755 AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3756 AH->WriteBytePtr(AH, AH->intSize);
3757 AH->WriteBytePtr(AH, AH->offSize);
3758 AH->WriteBytePtr(AH, AH->format);
3759 WriteInt(AH, AH->compression);
3760 crtm = *localtime(&AH->createDate);
3761 WriteInt(AH, crtm.tm_sec);
3762 WriteInt(AH, crtm.tm_min);
3763 WriteInt(AH, crtm.tm_hour);
3764 WriteInt(AH, crtm.tm_mday);
3765 WriteInt(AH, crtm.tm_mon);
3766 WriteInt(AH, crtm.tm_year);
3767 WriteInt(AH, crtm.tm_isdst);
3768 WriteStr(AH, PQdb(AH->connection));
3769 WriteStr(AH, AH->public.remoteVersionStr);
3770 WriteStr(AH, PG_VERSION);
3771}
3772
3773void
3774ReadHead(ArchiveHandle *AH)
3775{
3776 char tmpMag[7];
3777 int fmt;
3778 struct tm crtm;
3779
3780 /*
3781 * If we haven't already read the header, do so.
3782 *
3783 * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3784 * way to unify the cases?
3785 */
3786 if (!AH->readHeader)
3787 {
3788 char vmaj,
3789 vmin,
3790 vrev;
3791
3792 AH->ReadBufPtr(AH, tmpMag, 5);
3793
3794 if (strncmp(tmpMag, "PGDMP", 5) != 0)
3795 fatal("did not find magic string in file header");
3796
3797 vmaj = AH->ReadBytePtr(AH);
3798 vmin = AH->ReadBytePtr(AH);
3799
3800 if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
3801 vrev = AH->ReadBytePtr(AH);
3802 else
3803 vrev = 0;
3804
3805 AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3806
3807 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3808 fatal("unsupported version (%d.%d) in file header",
3809 vmaj, vmin);
3810
3811 AH->intSize = AH->ReadBytePtr(AH);
3812 if (AH->intSize > 32)
3813 fatal("sanity check on integer size (%lu) failed",
3814 (unsigned long) AH->intSize);
3815
3816 if (AH->intSize > sizeof(int))
3817 pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
3818
3819 if (AH->version >= K_VERS_1_7)
3820 AH->offSize = AH->ReadBytePtr(AH);
3821 else
3822 AH->offSize = AH->intSize;
3823
3824 fmt = AH->ReadBytePtr(AH);
3825
3826 if (AH->format != fmt)
3827 fatal("expected format (%d) differs from format found in file (%d)",
3828 AH->format, fmt);
3829 }
3830
3831 if (AH->version >= K_VERS_1_2)
3832 {
3833 if (AH->version < K_VERS_1_4)
3834 AH->compression = AH->ReadBytePtr(AH);
3835 else
3836 AH->compression = ReadInt(AH);
3837 }
3838 else
3839 AH->compression = Z_DEFAULT_COMPRESSION;
3840
3841#ifndef HAVE_LIBZ
3842 if (AH->compression != 0)
3843 pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
3844#endif
3845
3846 if (AH->version >= K_VERS_1_4)
3847 {
3848 crtm.tm_sec = ReadInt(AH);
3849 crtm.tm_min = ReadInt(AH);
3850 crtm.tm_hour = ReadInt(AH);
3851 crtm.tm_mday = ReadInt(AH);
3852 crtm.tm_mon = ReadInt(AH);
3853 crtm.tm_year = ReadInt(AH);
3854 crtm.tm_isdst = ReadInt(AH);
3855
3856 AH->archdbname = ReadStr(AH);
3857
3858 AH->createDate = mktime(&crtm);
3859
3860 if (AH->createDate == (time_t) -1)
3861 pg_log_warning("invalid creation date in header");
3862 }
3863
3864 if (AH->version >= K_VERS_1_10)
3865 {
3866 AH->archiveRemoteVersion = ReadStr(AH);
3867 AH->archiveDumpVersion = ReadStr(AH);
3868 }
3869}
3870
3871
3872/*
3873 * checkSeek
3874 * check to see if ftell/fseek can be performed.
3875 */
3876bool
3877checkSeek(FILE *fp)
3878{
3879 pgoff_t tpos;
3880
3881 /*
3882 * If pgoff_t is wider than long, we must have "real" fseeko and not an
3883 * emulation using fseek. Otherwise report no seek capability.
3884 */
3885#ifndef HAVE_FSEEKO
3886 if (sizeof(pgoff_t) > sizeof(long))
3887 return false;
3888#endif
3889
3890 /* Check that ftello works on this file */
3891 tpos = ftello(fp);
3892 if (tpos < 0)
3893 return false;
3894
3895 /*
3896 * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
3897 * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
3898 * successful no-op even on files that are otherwise unseekable.
3899 */
3900 if (fseeko(fp, tpos, SEEK_SET) != 0)
3901 return false;
3902
3903 return true;
3904}
3905
3906
3907/*
3908 * dumpTimestamp
3909 */
3910static void
3911dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3912{
3913 char buf[64];
3914
3915 if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3916 ahprintf(AH, "-- %s %s\n\n", msg, buf);
3917}
3918
3919/*
3920 * Main engine for parallel restore.
3921 *
3922 * Parallel restore is done in three phases. In this first phase,
3923 * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3924 * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
3925 * PRE_DATA items other than ACLs.) Entries we can't process now are
3926 * added to the pending_list for later phases to deal with.
3927 */
3928static void
3929restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
3930{
3931 bool skipped_some;
3932 TocEntry *next_work_item;
3933
3934 pg_log_debug("entering restore_toc_entries_prefork");
3935
3936 /* Adjust dependency information */
3937 fix_dependencies(AH);
3938
3939 /*
3940 * Do all the early stuff in a single connection in the parent. There's no
3941 * great point in running it in parallel, in fact it will actually run
3942 * faster in a single connection because we avoid all the connection and
3943 * setup overhead. Also, pre-9.2 pg_dump versions were not very good
3944 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3945 * not risk trying to process them out-of-order.
3946 *
3947 * Stuff that we can't do immediately gets added to the pending_list.
3948 * Note: we don't yet filter out entries that aren't going to be restored.
3949 * They might participate in dependency chains connecting entries that
3950 * should be restored, so we treat them as live until we actually process
3951 * them.
3952 *
3953 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3954 * before DATA items, and all DATA items before POST_DATA items. That is
3955 * not certain to be true in older archives, though, and in any case use
3956 * of a list file would destroy that ordering (cf. SortTocFromFile). So
3957 * this loop cannot assume that it holds.
3958 */
3959 AH->restorePass = RESTORE_PASS_MAIN;
3960 skipped_some = false;
3961 for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3962 {
3963 bool do_now = true;
3964
3965 if (next_work_item->section != SECTION_PRE_DATA)
3966 {
3967 /* DATA and POST_DATA items are just ignored for now */
3968 if (next_work_item->section == SECTION_DATA ||
3969 next_work_item->section == SECTION_POST_DATA)
3970 {
3971 do_now = false;
3972 skipped_some = true;
3973 }
3974 else
3975 {
3976 /*
3977 * SECTION_NONE items, such as comments, can be processed now
3978 * if we are still in the PRE_DATA part of the archive. Once
3979 * we've skipped any items, we have to consider whether the
3980 * comment's dependencies are satisfied, so skip it for now.
3981 */
3982 if (skipped_some)
3983 do_now = false;
3984 }
3985 }
3986
3987 /*
3988 * Also skip items that need to be forced into later passes. We need
3989 * not set skipped_some in this case, since by assumption no main-pass
3990 * items could depend on these.
3991 */
3992 if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3993 do_now = false;
3994
3995 if (do_now)
3996 {
3997 /* OK, restore the item and update its dependencies */
3998 pg_log_info("processing item %d %s %s",
3999 next_work_item->dumpId,
4000 next_work_item->desc, next_work_item->tag);
4001
4002 (void) restore_toc_entry(AH, next_work_item, false);
4003
4004 /* Reduce dependencies, but don't move anything to ready_list */
4005 reduce_dependencies(AH, next_work_item, NULL);
4006 }
4007 else
4008 {
4009 /* Nope, so add it to pending_list */
4010 pending_list_append(pending_list, next_work_item);
4011 }
4012 }
4013
4014 /*
4015 * Now close parent connection in prep for parallel steps. We do this
4016 * mainly to ensure that we don't exceed the specified number of parallel
4017 * connections.
4018 */
4019 DisconnectDatabase(&AH->public);
4020
4021 /* blow away any transient state from the old connection */
4022 if (AH->currUser)
4023 free(AH->currUser);
4024 AH->currUser = NULL;
4025 if (AH->currSchema)
4026 free(AH->currSchema);
4027 AH->currSchema = NULL;
4028 if (AH->currTablespace)
4029 free(AH->currTablespace);
4030 AH->currTablespace = NULL;
4031 if (AH->currTableAm)
4032 free(AH->currTableAm);
4033 AH->currTableAm = NULL;
4034}
4035
4036/*
4037 * Main engine for parallel restore.
4038 *
4039 * Parallel restore is done in three phases. In this second phase,
4040 * we process entries by dispatching them to parallel worker children
4041 * (processes on Unix, threads on Windows), each of which connects
4042 * separately to the database. Inter-entry dependencies are respected,
4043 * and so is the RestorePass multi-pass structure. When we can no longer
4044 * make any entries ready to process, we exit. Normally, there will be
4045 * nothing left to do; but if there is, the third phase will mop up.
4046 */
4047static void
4048restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4049 TocEntry *pending_list)
4050{
4051 ParallelReadyList ready_list;
4052 TocEntry *next_work_item;
4053
4054 pg_log_debug("entering restore_toc_entries_parallel");
4055
4056 /* Set up ready_list with enough room for all known TocEntrys */
4057 ready_list_init(&ready_list, AH->tocCount);
4058
4059 /*
4060 * The pending_list contains all items that we need to restore. Move all
4061 * items that are available to process immediately into the ready_list.
4062 * After this setup, the pending list is everything that needs to be done
4063 * but is blocked by one or more dependencies, while the ready list
4064 * contains items that have no remaining dependencies and are OK to
4065 * process in the current restore pass.
4066 */
4067 AH->restorePass = RESTORE_PASS_MAIN;
4068 move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4069
4070 /*
4071 * main parent loop
4072 *
4073 * Keep going until there is no worker still running AND there is no work
4074 * left to be done. Note invariant: at top of loop, there should always
4075 * be at least one worker available to dispatch a job to.
4076 */
4077 pg_log_info("entering main parallel loop");
4078
4079 for (;;)
4080 {
4081 /* Look for an item ready to be dispatched to a worker */
4082 next_work_item = pop_next_work_item(AH, &ready_list, pstate);
4083 if (next_work_item != NULL)
4084 {
4085 /* If not to be restored, don't waste time launching a worker */
4086 if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4087 {
4088 pg_log_info("skipping item %d %s %s",
4089 next_work_item->dumpId,
4090 next_work_item->desc, next_work_item->tag);
4091 /* Update its dependencies as though we'd completed it */
4092 reduce_dependencies(AH, next_work_item, &ready_list);
4093 /* Loop around to see if anything else can be dispatched */
4094 continue;
4095 }
4096
4097 pg_log_info("launching item %d %s %s",
4098 next_work_item->dumpId,
4099 next_work_item->desc, next_work_item->tag);
4100
4101 /* Dispatch to some worker */
4102 DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4103 mark_restore_job_done, &ready_list);
4104 }
4105 else if (IsEveryWorkerIdle(pstate))
4106 {
4107 /*
4108 * Nothing is ready and no worker is running, so we're done with
4109 * the current pass or maybe with the whole process.
4110 */
4111 if (AH->restorePass == RESTORE_PASS_LAST)
4112 break; /* No more parallel processing is possible */
4113
4114 /* Advance to next restore pass */
4115 AH->restorePass++;
4116 /* That probably allows some stuff to be made ready */
4117 move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4118 /* Loop around to see if anything's now ready */
4119 continue;
4120 }
4121 else
4122 {
4123 /*
4124 * We have nothing ready, but at least one child is working, so
4125 * wait for some subjob to finish.
4126 */
4127 }
4128
4129 /*
4130 * Before dispatching another job, check to see if anything has
4131 * finished. We should check every time through the loop so as to
4132 * reduce dependencies as soon as possible. If we were unable to
4133 * dispatch any job this time through, wait until some worker finishes
4134 * (and, hopefully, unblocks some pending item). If we did dispatch
4135 * something, continue as soon as there's at least one idle worker.
4136 * Note that in either case, there's guaranteed to be at least one
4137 * idle worker when we return to the top of the loop. This ensures we
4138 * won't block inside DispatchJobForTocEntry, which would be
4139 * undesirable: we'd rather postpone dispatching until we see what's
4140 * been unblocked by finished jobs.
4141 */
4142 WaitForWorkers(AH, pstate,
4143 next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4144 }
4145
4146 /* There should now be nothing in ready_list. */
4147 Assert(ready_list.first_te > ready_list.last_te);
4148
4149 ready_list_free(&ready_list);
4150
4151 pg_log_info("finished main parallel loop");
4152}
4153
4154/*
4155 * Main engine for parallel restore.
4156 *
4157 * Parallel restore is done in three phases. In this third phase,
4158 * we mop up any remaining TOC entries by processing them serially.
4159 * This phase normally should have nothing to do, but if we've somehow
4160 * gotten stuck due to circular dependencies or some such, this provides
4161 * at least some chance of completing the restore successfully.
4162 */
4163static void
4164restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4165{
4166 RestoreOptions *ropt = AH->public.ropt;
4167 TocEntry *te;
4168
4169 pg_log_debug("entering restore_toc_entries_postfork");
4170
4171 /*
4172 * Now reconnect the single parent connection.
4173 */
4174 ConnectDatabase((Archive *) AH, ropt->dbname,
4175 ropt->pghost, ropt->pgport, ropt->username,
4176 ropt->promptPassword);
4177
4178 /* re-establish fixed state */
4179 _doSetFixedOutputState(AH);
4180
4181 /*
4182 * Make sure there is no work left due to, say, circular dependencies, or
4183 * some other pathological condition. If so, do it in the single parent
4184 * connection. We don't sweat about RestorePass ordering; it's likely we
4185 * already violated that.
4186 */
4187 for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4188 {
4189 pg_log_info("processing missed item %d %s %s",
4190 te->dumpId, te->desc, te->tag);
4191 (void) restore_toc_entry(AH, te, false);
4192 }
4193}
4194
4195/*
4196 * Check if te1 has an exclusive lock requirement for an item that te2 also
4197 * requires, whether or not te2's requirement is for an exclusive lock.
4198 */
4199static bool
4200has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4201{
4202 int j,
4203 k;
4204
4205 for (j = 0; j < te1->nLockDeps; j++)
4206 {
4207 for (k = 0; k < te2->nDeps; k++)
4208 {
4209 if (te1->lockDeps[j] == te2->dependencies[k])
4210 return true;
4211 }
4212 }
4213 return false;
4214}
4215
4216
4217/*
4218 * Initialize the header of the pending-items list.
4219 *
4220 * This is a circular list with a dummy TocEntry as header, just like the
4221 * main TOC list; but we use separate list links so that an entry can be in
4222 * the main TOC list as well as in the pending list.
4223 */
4224static void
4225pending_list_header_init(TocEntry *l)
4226{
4227 l->pending_prev = l->pending_next = l;
4228}
4229
4230/* Append te to the end of the pending-list headed by l */
4231static void
4232pending_list_append(TocEntry *l, TocEntry *te)
4233{
4234 te->pending_prev = l->pending_prev;
4235 l->pending_prev->pending_next = te;
4236 l->pending_prev = te;
4237 te->pending_next = l;
4238}
4239
4240/* Remove te from the pending-list */
4241static void
4242pending_list_remove(TocEntry *te)
4243{
4244 te->pending_prev->pending_next = te->pending_next;
4245 te->pending_next->pending_prev = te->pending_prev;
4246 te->pending_prev = NULL;
4247 te->pending_next = NULL;
4248}
4249
4250
4251/*
4252 * Initialize the ready_list with enough room for up to tocCount entries.
4253 */
4254static void
4255ready_list_init(ParallelReadyList *ready_list, int tocCount)
4256{
4257 ready_list->tes = (TocEntry **)
4258 pg_malloc(tocCount * sizeof(TocEntry *));
4259 ready_list->first_te = 0;
4260 ready_list->last_te = -1;
4261 ready_list->sorted = false;
4262}
4263
4264/*
4265 * Free storage for a ready_list.
4266 */
4267static void
4268ready_list_free(ParallelReadyList *ready_list)
4269{
4270 pg_free(ready_list->tes);
4271}
4272
4273/* Add te to the ready_list */
4274static void
4275ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
4276{
4277 ready_list->tes[++ready_list->last_te] = te;
4278 /* List is (probably) not sorted anymore. */
4279 ready_list->sorted = false;
4280}
4281
4282/* Remove the i'th entry in the ready_list */
4283static void
4284ready_list_remove(ParallelReadyList *ready_list, int i)
4285{
4286 int f = ready_list->first_te;
4287
4288 Assert(i >= f && i <= ready_list->last_te);
4289
4290 /*
4291 * In the typical case where the item to be removed is the first ready
4292 * entry, we need only increment first_te to remove it. Otherwise, move
4293 * the entries before it to compact the list. (This preserves sortedness,
4294 * if any.) We could alternatively move the entries after i, but there
4295 * are typically many more of those.
4296 */
4297 if (i > f)
4298 {
4299 TocEntry **first_te_ptr = &ready_list->tes[f];
4300
4301 memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
4302 }
4303 ready_list->first_te++;
4304}
4305
4306/* Sort the ready_list into the desired order */
4307static void
4308ready_list_sort(ParallelReadyList *ready_list)
4309{
4310 if (!ready_list->sorted)
4311 {
4312 int n = ready_list->last_te - ready_list->first_te + 1;
4313
4314 if (n > 1)
4315 qsort(ready_list->tes + ready_list->first_te, n,
4316 sizeof(TocEntry *),
4317 TocEntrySizeCompare);
4318 ready_list->sorted = true;
4319 }
4320}
4321
4322/* qsort comparator for sorting TocEntries by dataLength */
4323static int
4324TocEntrySizeCompare(const void *p1, const void *p2)
4325{
4326 const TocEntry *te1 = *(const TocEntry *const *) p1;
4327 const TocEntry *te2 = *(const TocEntry *const *) p2;
4328
4329 /* Sort by decreasing dataLength */
4330 if (te1->dataLength > te2->dataLength)
4331 return -1;
4332 if (te1->dataLength < te2->dataLength)
4333 return 1;
4334
4335 /* For equal dataLengths, sort by dumpId, just to be stable */
4336 if (te1->dumpId < te2->dumpId)
4337 return -1;
4338 if (te1->dumpId > te2->dumpId)
4339 return 1;
4340
4341 return 0;
4342}
4343
4344
4345/*
4346 * Move all immediately-ready items from pending_list to ready_list.
4347 *
4348 * Items are considered ready if they have no remaining dependencies and
4349 * they belong in the current restore pass. (See also reduce_dependencies,
4350 * which applies the same logic one-at-a-time.)
4351 */
4352static void
4353move_to_ready_list(TocEntry *pending_list,
4354 ParallelReadyList *ready_list,
4355 RestorePass pass)
4356{
4357 TocEntry *te;
4358 TocEntry *next_te;
4359
4360 for (te = pending_list->pending_next; te != pending_list; te = next_te)
4361 {
4362 /* must save list link before possibly removing te from list */
4363 next_te = te->pending_next;
4364
4365 if (te->depCount == 0 &&
4366 _tocEntryRestorePass(te) == pass)
4367 {
4368 /* Remove it from pending_list ... */
4369 pending_list_remove(te);
4370 /* ... and add to ready_list */
4371 ready_list_insert(ready_list, te);
4372 }
4373 }
4374}
4375
4376/*
4377 * Find the next work item (if any) that is capable of being run now,
4378 * and remove it from the ready_list.
4379 *
4380 * Returns the item, or NULL if nothing is runnable.
4381 *
4382 * To qualify, the item must have no remaining dependencies
4383 * and no requirements for locks that are incompatible with
4384 * items currently running. Items in the ready_list are known to have
4385 * no remaining dependencies, but we have to check for lock conflicts.
4386 */
4387static TocEntry *
4388pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list,
4389 ParallelState *pstate)
4390{
4391 /*
4392 * Sort the ready_list so that we'll tackle larger jobs first.
4393 */
4394 ready_list_sort(ready_list);
4395
4396 /*
4397 * Search the ready_list until we find a suitable item.
4398 */
4399 for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
4400 {
4401 TocEntry *te = ready_list->tes[i];
4402 bool conflicts = false;
4403
4404 /*
4405 * Check to see if the item would need exclusive lock on something
4406 * that a currently running item also needs lock on, or vice versa. If
4407 * so, we don't want to schedule them together.
4408 */
4409 for (int k = 0; k < pstate->numWorkers; k++)
4410 {
4411 TocEntry *running_te = pstate->te[k];
4412
4413 if (running_te == NULL)
4414 continue;
4415 if (has_lock_conflicts(te, running_te) ||
4416 has_lock_conflicts(running_te, te))
4417 {
4418 conflicts = true;
4419 break;
4420 }
4421 }
4422
4423 if (conflicts)
4424 continue;
4425
4426 /* passed all tests, so this item can run */
4427 ready_list_remove(ready_list, i);
4428 return te;
4429 }
4430
4431 pg_log_debug("no item ready");
4432 return NULL;
4433}
4434
4435
4436/*
4437 * Restore a single TOC item in parallel with others
4438 *
4439 * this is run in the worker, i.e. in a thread (Windows) or a separate process
4440 * (everything else). A worker process executes several such work items during
4441 * a parallel backup or restore. Once we terminate here and report back that
4442 * our work is finished, the master process will assign us a new work item.
4443 */
4444int
4445parallel_restore(ArchiveHandle *AH, TocEntry *te)
4446{
4447 int status;
4448
4449 Assert(AH->connection != NULL);
4450
4451 /* Count only errors associated with this TOC entry */
4452 AH->public.n_errors = 0;
4453
4454 /* Restore the TOC item */
4455 status = restore_toc_entry(AH, te, true);
4456
4457 return status;
4458}
4459
4460
4461/*
4462 * Callback function that's invoked in the master process after a step has
4463 * been parallel restored.
4464 *
4465 * Update status and reduce the dependency count of any dependent items.
4466 */
4467static void
4468mark_restore_job_done(ArchiveHandle *AH,
4469 TocEntry *te,
4470 int status,
4471 void *callback_data)
4472{
4473 ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
4474
4475 pg_log_info("finished item %d %s %s",
4476 te->dumpId, te->desc, te->tag);
4477
4478 if (status == WORKER_CREATE_DONE)
4479 mark_create_done(AH, te);
4480 else if (status == WORKER_INHIBIT_DATA)
4481 {
4482 inhibit_data_for_failed_table(AH, te);
4483 AH->public.n_errors++;
4484 }
4485 else if (status == WORKER_IGNORED_ERRORS)
4486 AH->public.n_errors++;
4487 else if (status != 0)
4488 fatal("worker process failed: exit code %d",
4489 status);
4490
4491 reduce_dependencies(AH, te, ready_list);
4492}
4493
4494
4495/*
4496 * Process the dependency information into a form useful for parallel restore.
4497 *
4498 * This function takes care of fixing up some missing or badly designed
4499 * dependencies, and then prepares subsidiary data structures that will be
4500 * used in the main parallel-restore logic, including:
4501 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4502 * 2. We set up depCount fields that are the number of as-yet-unprocessed
4503 * dependencies for each TOC entry.
4504 *
4505 * We also identify locking dependencies so that we can avoid trying to
4506 * schedule conflicting items at the same time.
4507 */
4508static void
4509fix_dependencies(ArchiveHandle *AH)
4510{
4511 TocEntry *te;
4512 int i;
4513
4514 /*
4515 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4516 * items are marked as not being in any parallel-processing list.
4517 */
4518 for (te = AH->toc->next; te != AH->toc; te = te->next)
4519 {
4520 te->depCount = te->nDeps;
4521 te->revDeps = NULL;
4522 te->nRevDeps = 0;
4523 te->pending_prev = NULL;
4524 te->pending_next = NULL;
4525 }
4526
4527 /*
4528 * POST_DATA items that are shown as depending on a table need to be
4529 * re-pointed to depend on that table's data, instead. This ensures they
4530 * won't get scheduled until the data has been loaded.
4531 */
4532 repoint_table_dependencies(AH);
4533
4534 /*
4535 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4536 * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4537 * one BLOB COMMENTS in such files.)
4538 */
4539 if (AH->version < K_VERS_1_11)
4540 {
4541 for (te = AH->toc->next; te != AH->toc; te = te->next)
4542 {
4543 if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4544 {
4545 TocEntry *te2;
4546
4547 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4548 {
4549 if (strcmp(te2->desc, "BLOBS") == 0)
4550 {
4551 te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4552 te->dependencies[0] = te2->dumpId;
4553 te->nDeps++;
4554 te->depCount++;
4555 break;
4556 }
4557 }
4558 break;
4559 }
4560 }
4561 }
4562
4563 /*
4564 * At this point we start to build the revDeps reverse-dependency arrays,
4565 * so all changes of dependencies must be complete.
4566 */
4567
4568 /*
4569 * Count the incoming dependencies for each item. Also, it is possible
4570 * that the dependencies list items that are not in the archive at all
4571 * (that should not happen in 9.2 and later, but is highly likely in older
4572 * archives). Subtract such items from the depCounts.
4573 */
4574 for (te = AH->toc->next; te != AH->toc; te = te->next)
4575 {
4576 for (i = 0; i < te->nDeps; i++)
4577 {
4578 DumpId depid = te->dependencies[i];
4579
4580 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4581 AH->tocsByDumpId[depid]->nRevDeps++;
4582 else
4583 te->depCount--;
4584 }
4585 }
4586
4587 /*
4588 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4589 * it as a counter below.
4590 */
4591 for (te = AH->toc->next; te != AH->toc; te = te->next)
4592 {
4593 if (te->nRevDeps > 0)
4594 te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4595 te->nRevDeps = 0;
4596 }
4597
4598 /*
4599 * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4600 * better agree with the loops above.
4601 */
4602 for (te = AH->toc->next; te != AH->toc; te = te->next)
4603 {
4604 for (i = 0; i < te->nDeps; i++)
4605 {
4606 DumpId depid = te->dependencies[i];
4607
4608 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4609 {
4610 TocEntry *otherte = AH->tocsByDumpId[depid];
4611
4612 otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4613 }
4614 }
4615 }
4616
4617 /*
4618 * Lastly, work out the locking dependencies.
4619 */
4620 for (te = AH->toc->next; te != AH->toc; te = te->next)
4621 {
4622 te->lockDeps = NULL;
4623 te->nLockDeps = 0;
4624 identify_locking_dependencies(AH, te);
4625 }
4626}
4627
4628/*
4629 * Change dependencies on table items to depend on table data items instead,
4630 * but only in POST_DATA items.
4631 *
4632 * Also, for any item having such dependency(s), set its dataLength to the
4633 * largest dataLength of the table data items it depends on. This ensures
4634 * that parallel restore will prioritize larger jobs (index builds, FK
4635 * constraint checks, etc) over smaller ones, avoiding situations where we
4636 * end a restore with only one active job working on a large table.
4637 */
4638static void
4639repoint_table_dependencies(ArchiveHandle *AH)
4640{
4641 TocEntry *te;
4642 int i;
4643 DumpId olddep;
4644
4645 for (te = AH->toc->next; te != AH->toc; te = te->next)
4646 {
4647 if (te->section != SECTION_POST_DATA)
4648 continue;
4649 for (i = 0; i < te->nDeps; i++)
4650 {
4651 olddep = te->dependencies[i];
4652 if (olddep <= AH->maxDumpId &&
4653 AH->tableDataId[olddep] != 0)
4654 {
4655 DumpId tabledataid = AH->tableDataId[olddep];
4656 TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4657
4658 te->dependencies[i] = tabledataid;
4659 te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4660 pg_log_debug("transferring dependency %d -> %d to %d",
4661 te->dumpId, olddep, tabledataid);
4662 }
4663 }
4664 }
4665}
4666
4667/*
4668 * Identify which objects we'll need exclusive lock on in order to restore
4669 * the given TOC entry (*other* than the one identified by the TOC entry
4670 * itself). Record their dump IDs in the entry's lockDeps[] array.
4671 */
4672static void
4673identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4674{
4675 DumpId *lockids;
4676 int nlockids;
4677 int i;
4678
4679 /*
4680 * We only care about this for POST_DATA items. PRE_DATA items are not
4681 * run in parallel, and DATA items are all independent by assumption.
4682 */
4683 if (te->section != SECTION_POST_DATA)
4684 return;
4685
4686 /* Quick exit if no dependencies at all */
4687 if (te->nDeps == 0)
4688 return;
4689
4690 /*
4691 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4692 * and hence require exclusive lock. However, we know that CREATE INDEX
4693 * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4694 * category too ... but today is not that day.)
4695 */
4696 if (strcmp(te->desc, "INDEX") == 0)
4697 return;
4698
4699 /*
4700 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4701 * item listed among its dependencies. Originally all of these would have
4702 * been TABLE items, but repoint_table_dependencies would have repointed
4703 * them to the TABLE DATA items if those are present (which they might not
4704 * be, eg in a schema-only dump). Note that all of the entries we are
4705 * processing here are POST_DATA; otherwise there might be a significant
4706 * difference between a dependency on a table and a dependency on its
4707 * data, so that closer analysis would be needed here.
4708 */
4709 lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4710 nlockids = 0;
4711 for (i = 0; i < te->nDeps; i++)
4712 {
4713 DumpId depid = te->dependencies[i];
4714
4715 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4716 ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4717 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4718 lockids[nlockids++] = depid;
4719 }
4720
4721 if (nlockids == 0)
4722 {
4723 free(lockids);
4724 return;
4725 }
4726
4727 te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4728 te->nLockDeps = nlockids;
4729}
4730
4731/*
4732 * Remove the specified TOC entry from the depCounts of items that depend on
4733 * it, thereby possibly making them ready-to-run. Any pending item that
4734 * becomes ready should be moved to the ready_list, if that's provided.
4735 */
4736static void
4737reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4738 ParallelReadyList *ready_list)
4739{
4740 int i;
4741
4742 pg_log_debug("reducing dependencies for %d", te->dumpId);
4743
4744 for (i = 0; i < te->nRevDeps; i++)
4745 {
4746 TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4747
4748 Assert(otherte->depCount > 0);
4749 otherte->depCount--;
4750
4751 /*
4752 * It's ready if it has no remaining dependencies, and it belongs in
4753 * the current restore pass, and it is currently a member of the
4754 * pending list (that check is needed to prevent double restore in
4755 * some cases where a list-file forces out-of-order restoring).
4756 * However, if ready_list == NULL then caller doesn't want any list
4757 * memberships changed.
4758 */
4759 if (otherte->depCount == 0 &&
4760 _tocEntryRestorePass(otherte) == AH->restorePass &&
4761 otherte->pending_prev != NULL &&
4762 ready_list != NULL)
4763 {
4764 /* Remove it from pending list ... */
4765 pending_list_remove(otherte);
4766 /* ... and add to ready_list */
4767 ready_list_insert(ready_list, otherte);
4768 }
4769 }
4770}
4771
4772/*
4773 * Set the created flag on the DATA member corresponding to the given
4774 * TABLE member
4775 */
4776static void
4777mark_create_done(ArchiveHandle *AH, TocEntry *te)
4778{
4779 if (AH->tableDataId[te->dumpId] != 0)
4780 {
4781 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4782
4783 ted->created = true;
4784 }
4785}
4786
4787/*
4788 * Mark the DATA member corresponding to the given TABLE member
4789 * as not wanted
4790 */
4791static void
4792inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4793{
4794 pg_log_info("table \"%s\" could not be created, will not restore its data",
4795 te->tag);
4796
4797 if (AH->tableDataId[te->dumpId] != 0)
4798 {
4799 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4800
4801 ted->reqs = 0;
4802 }
4803}
4804
4805/*
4806 * Clone and de-clone routines used in parallel restoration.
4807 *
4808 * Enough of the structure is cloned to ensure that there is no
4809 * conflict between different threads each with their own clone.
4810 */
4811ArchiveHandle *
4812CloneArchive(ArchiveHandle *AH)
4813{
4814 ArchiveHandle *clone;
4815
4816 /* Make a "flat" copy */
4817 clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4818 memcpy(clone, AH, sizeof(ArchiveHandle));
4819
4820 /* Handle format-independent fields */
4821 memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4822
4823 /* The clone will have its own connection, so disregard connection state */
4824 clone->connection = NULL;
4825 clone->connCancel = NULL;
4826 clone->currUser = NULL;
4827 clone->currSchema = NULL;
4828 clone->currTablespace = NULL;
4829
4830 /* savedPassword must be local in case we change it while connecting */
4831 if (clone->savedPassword)
4832 clone->savedPassword = pg_strdup(clone->savedPassword);
4833
4834 /* clone has its own error count, too */
4835 clone->public.n_errors = 0;
4836
4837 /*
4838 * Connect our new clone object to the database: In parallel restore the
4839 * parent is already disconnected, because we can connect the worker
4840 * processes independently to the database (no snapshot sync required). In
4841 * parallel backup we clone the parent's existing connection.
4842 */
4843 if (AH->mode == archModeRead)
4844 {
4845 RestoreOptions *ropt = AH->public.ropt;
4846
4847 Assert(AH->connection == NULL);
4848
4849 /* this also sets clone->connection */
4850 ConnectDatabase((Archive *) clone, ropt->dbname,
4851 ropt->pghost, ropt->pgport, ropt->username,
4852 ropt->promptPassword);
4853
4854 /* re-establish fixed state */
4855 _doSetFixedOutputState(clone);
4856 }
4857 else
4858 {
4859 PQExpBufferData connstr;
4860 char *pghost;
4861 char *pgport;
4862 char *username;
4863
4864 Assert(AH->connection != NULL);
4865
4866 /*
4867 * Even though we are technically accessing the parent's database
4868 * object here, these functions are fine to be called like that
4869 * because all just return a pointer and do not actually send/receive
4870 * any data to/from the database.
4871 */
4872 initPQExpBuffer(&connstr);
4873 appendPQExpBuffer(&connstr, "dbname=");
4874 appendConnStrVal(&connstr, PQdb(AH->connection));
4875 pghost = PQhost(AH->connection);
4876 pgport = PQport(AH->connection);
4877 username = PQuser(AH->connection);
4878
4879 /* this also sets clone->connection */
4880 ConnectDatabase((Archive *) clone, connstr.data,
4881 pghost, pgport, username, TRI_NO);
4882
4883 termPQExpBuffer(&connstr);
4884 /* setupDumpWorker will fix up connection state */
4885 }
4886
4887 /* Let the format-specific code have a chance too */
4888 clone->ClonePtr(clone);
4889
4890 Assert(clone->connection != NULL);
4891 return clone;
4892}
4893
4894/*
4895 * Release clone-local storage.
4896 *
4897 * Note: we assume any clone-local connection was already closed.
4898 */
4899void
4900DeCloneArchive(ArchiveHandle *AH)
4901{
4902 /* Should not have an open database connection */
4903 Assert(AH->connection == NULL);
4904
4905 /* Clear format-specific state */
4906 AH->DeClonePtr(AH);
4907
4908 /* Clear state allocated by CloneArchive */
4909 if (AH->sqlparse.curCmd)
4910 destroyPQExpBuffer(AH->sqlparse.curCmd);
4911
4912 /* Clear any connection-local state */
4913 if (AH->currUser)
4914 free(AH->currUser);
4915 if (AH->currSchema)
4916 free(AH->currSchema);
4917 if (AH->currTablespace)
4918 free(AH->currTablespace);
4919 if (AH->currTableAm)
4920 free(AH->currTableAm);
4921 if (AH->savedPassword)
4922 free(AH->savedPassword);
4923
4924 free(AH);
4925}
4926