1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * pg_backup_custom.c |
4 | * |
5 | * Implements the custom output format. |
6 | * |
7 | * The comments with the routines in this code are a good place to |
8 | * understand how to write a new format. |
9 | * |
10 | * See the headers to pg_restore for more details. |
11 | * |
12 | * Copyright (c) 2000, Philip Warner |
13 | * Rights are granted to use this software in any way so long |
14 | * as this notice is not removed. |
15 | * |
16 | * The author is not responsible for loss or damages that may |
17 | * and any liability will be limited to the time taken to fix any |
18 | * related bug. |
19 | * |
20 | * |
21 | * IDENTIFICATION |
22 | * src/bin/pg_dump/pg_backup_custom.c |
23 | * |
24 | *------------------------------------------------------------------------- |
25 | */ |
26 | #include "postgres_fe.h" |
27 | |
28 | #include "compress_io.h" |
29 | #include "parallel.h" |
30 | #include "pg_backup_utils.h" |
31 | #include "common/file_utils.h" |
32 | |
33 | |
34 | /*-------- |
35 | * Routines in the format interface |
36 | *-------- |
37 | */ |
38 | |
39 | static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te); |
40 | static void _StartData(ArchiveHandle *AH, TocEntry *te); |
41 | static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen); |
42 | static void _EndData(ArchiveHandle *AH, TocEntry *te); |
43 | static int _WriteByte(ArchiveHandle *AH, const int i); |
44 | static int _ReadByte(ArchiveHandle *); |
45 | static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); |
46 | static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); |
47 | static void _CloseArchive(ArchiveHandle *AH); |
48 | static void _ReopenArchive(ArchiveHandle *AH); |
49 | static void _PrintTocData(ArchiveHandle *AH, TocEntry *te); |
50 | static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); |
51 | static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); |
52 | static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te); |
53 | |
54 | static void _PrintData(ArchiveHandle *AH); |
55 | static void _skipData(ArchiveHandle *AH); |
56 | static void _skipBlobs(ArchiveHandle *AH); |
57 | |
58 | static void _StartBlobs(ArchiveHandle *AH, TocEntry *te); |
59 | static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); |
60 | static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); |
61 | static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); |
62 | static void _LoadBlobs(ArchiveHandle *AH, bool drop); |
63 | |
64 | static void _PrepParallelRestore(ArchiveHandle *AH); |
65 | static void _Clone(ArchiveHandle *AH); |
66 | static void _DeClone(ArchiveHandle *AH); |
67 | |
68 | static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te); |
69 | |
70 | typedef struct |
71 | { |
72 | CompressorState *cs; |
73 | int hasSeek; |
74 | pgoff_t filePos; |
75 | pgoff_t dataStart; |
76 | } lclContext; |
77 | |
78 | typedef struct |
79 | { |
80 | int dataState; |
81 | pgoff_t dataPos; |
82 | } lclTocEntry; |
83 | |
84 | |
85 | /*------ |
86 | * Static declarations |
87 | *------ |
88 | */ |
89 | static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id); |
90 | static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx); |
91 | |
92 | static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len); |
93 | static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen); |
94 | |
95 | |
96 | /* |
97 | * Init routine required by ALL formats. This is a global routine |
98 | * and should be declared in pg_backup_archiver.h |
99 | * |
100 | * It's task is to create any extra archive context (using AH->formatData), |
101 | * and to initialize the supported function pointers. |
102 | * |
103 | * It should also prepare whatever it's input source is for reading/writing, |
104 | * and in the case of a read mode connection, it should load the Header & TOC. |
105 | */ |
106 | void |
107 | InitArchiveFmt_Custom(ArchiveHandle *AH) |
108 | { |
109 | lclContext *ctx; |
110 | |
111 | /* Assuming static functions, this can be copied for each format. */ |
112 | AH->ArchiveEntryPtr = _ArchiveEntry; |
113 | AH->StartDataPtr = _StartData; |
114 | AH->WriteDataPtr = _WriteData; |
115 | AH->EndDataPtr = _EndData; |
116 | AH->WriteBytePtr = _WriteByte; |
117 | AH->ReadBytePtr = _ReadByte; |
118 | AH->WriteBufPtr = _WriteBuf; |
119 | AH->ReadBufPtr = _ReadBuf; |
120 | AH->ClosePtr = _CloseArchive; |
121 | AH->ReopenPtr = _ReopenArchive; |
122 | AH->PrintTocDataPtr = _PrintTocData; |
123 | AH->ReadExtraTocPtr = _ReadExtraToc; |
124 | AH->WriteExtraTocPtr = _WriteExtraToc; |
125 | AH->PrintExtraTocPtr = _PrintExtraToc; |
126 | |
127 | AH->StartBlobsPtr = _StartBlobs; |
128 | AH->StartBlobPtr = _StartBlob; |
129 | AH->EndBlobPtr = _EndBlob; |
130 | AH->EndBlobsPtr = _EndBlobs; |
131 | |
132 | AH->PrepParallelRestorePtr = _PrepParallelRestore; |
133 | AH->ClonePtr = _Clone; |
134 | AH->DeClonePtr = _DeClone; |
135 | |
136 | /* no parallel dump in the custom archive, only parallel restore */ |
137 | AH->WorkerJobDumpPtr = NULL; |
138 | AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom; |
139 | |
140 | /* Set up a private area. */ |
141 | ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); |
142 | AH->formatData = (void *) ctx; |
143 | |
144 | /* Initialize LO buffering */ |
145 | AH->lo_buf_size = LOBBUFSIZE; |
146 | AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE); |
147 | |
148 | ctx->filePos = 0; |
149 | |
150 | /* |
151 | * Now open the file |
152 | */ |
153 | if (AH->mode == archModeWrite) |
154 | { |
155 | if (AH->fSpec && strcmp(AH->fSpec, "" ) != 0) |
156 | { |
157 | AH->FH = fopen(AH->fSpec, PG_BINARY_W); |
158 | if (!AH->FH) |
159 | fatal("could not open output file \"%s\": %m" , AH->fSpec); |
160 | } |
161 | else |
162 | { |
163 | AH->FH = stdout; |
164 | if (!AH->FH) |
165 | fatal("could not open output file: %m" ); |
166 | } |
167 | |
168 | ctx->hasSeek = checkSeek(AH->FH); |
169 | } |
170 | else |
171 | { |
172 | if (AH->fSpec && strcmp(AH->fSpec, "" ) != 0) |
173 | { |
174 | AH->FH = fopen(AH->fSpec, PG_BINARY_R); |
175 | if (!AH->FH) |
176 | fatal("could not open input file \"%s\": %m" , AH->fSpec); |
177 | } |
178 | else |
179 | { |
180 | AH->FH = stdin; |
181 | if (!AH->FH) |
182 | fatal("could not open input file: %m" ); |
183 | } |
184 | |
185 | ctx->hasSeek = checkSeek(AH->FH); |
186 | |
187 | ReadHead(AH); |
188 | ReadToc(AH); |
189 | ctx->dataStart = _getFilePos(AH, ctx); |
190 | } |
191 | |
192 | } |
193 | |
194 | /* |
195 | * Called by the Archiver when the dumper creates a new TOC entry. |
196 | * |
197 | * Optional. |
198 | * |
199 | * Set up extract format-related TOC data. |
200 | */ |
201 | static void |
202 | _ArchiveEntry(ArchiveHandle *AH, TocEntry *te) |
203 | { |
204 | lclTocEntry *ctx; |
205 | |
206 | ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); |
207 | if (te->dataDumper) |
208 | ctx->dataState = K_OFFSET_POS_NOT_SET; |
209 | else |
210 | ctx->dataState = K_OFFSET_NO_DATA; |
211 | |
212 | te->formatData = (void *) ctx; |
213 | } |
214 | |
215 | /* |
216 | * Called by the Archiver to save any extra format-related TOC entry |
217 | * data. |
218 | * |
219 | * Optional. |
220 | * |
221 | * Use the Archiver routines to write data - they are non-endian, and |
222 | * maintain other important file information. |
223 | */ |
224 | static void |
225 | (ArchiveHandle *AH, TocEntry *te) |
226 | { |
227 | lclTocEntry *ctx = (lclTocEntry *) te->formatData; |
228 | |
229 | WriteOffset(AH, ctx->dataPos, ctx->dataState); |
230 | } |
231 | |
232 | /* |
233 | * Called by the Archiver to read any extra format-related TOC data. |
234 | * |
235 | * Optional. |
236 | * |
237 | * Needs to match the order defined in _WriteExtraToc, and should also |
238 | * use the Archiver input routines. |
239 | */ |
240 | static void |
241 | (ArchiveHandle *AH, TocEntry *te) |
242 | { |
243 | lclTocEntry *ctx = (lclTocEntry *) te->formatData; |
244 | |
245 | if (ctx == NULL) |
246 | { |
247 | ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); |
248 | te->formatData = (void *) ctx; |
249 | } |
250 | |
251 | ctx->dataState = ReadOffset(AH, &(ctx->dataPos)); |
252 | |
253 | /* |
254 | * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't |
255 | * dump it at all. |
256 | */ |
257 | if (AH->version < K_VERS_1_7) |
258 | ReadInt(AH); |
259 | } |
260 | |
261 | /* |
262 | * Called by the Archiver when restoring an archive to output a comment |
263 | * that includes useful information about the TOC entry. |
264 | * |
265 | * Optional. |
266 | * |
267 | */ |
268 | static void |
269 | (ArchiveHandle *AH, TocEntry *te) |
270 | { |
271 | lclTocEntry *ctx = (lclTocEntry *) te->formatData; |
272 | |
273 | if (AH->public.verbose) |
274 | ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n" , |
275 | (int64) ctx->dataPos); |
276 | } |
277 | |
278 | /* |
279 | * Called by the archiver when saving TABLE DATA (not schema). This routine |
280 | * should save whatever format-specific information is needed to read |
281 | * the archive back. |
282 | * |
283 | * It is called just prior to the dumper's 'DataDumper' routine being called. |
284 | * |
285 | * Optional, but strongly recommended. |
286 | * |
287 | */ |
288 | static void |
289 | _StartData(ArchiveHandle *AH, TocEntry *te) |
290 | { |
291 | lclContext *ctx = (lclContext *) AH->formatData; |
292 | lclTocEntry *tctx = (lclTocEntry *) te->formatData; |
293 | |
294 | tctx->dataPos = _getFilePos(AH, ctx); |
295 | tctx->dataState = K_OFFSET_POS_SET; |
296 | |
297 | _WriteByte(AH, BLK_DATA); /* Block type */ |
298 | WriteInt(AH, te->dumpId); /* For sanity check */ |
299 | |
300 | ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); |
301 | } |
302 | |
303 | /* |
304 | * Called by archiver when dumper calls WriteData. This routine is |
305 | * called for both BLOB and TABLE data; it is the responsibility of |
306 | * the format to manage each kind of data using StartBlob/StartData. |
307 | * |
308 | * It should only be called from within a DataDumper routine. |
309 | * |
310 | * Mandatory. |
311 | */ |
312 | static void |
313 | _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) |
314 | { |
315 | lclContext *ctx = (lclContext *) AH->formatData; |
316 | CompressorState *cs = ctx->cs; |
317 | |
318 | if (dLen > 0) |
319 | /* WriteDataToArchive() internally throws write errors */ |
320 | WriteDataToArchive(AH, cs, data, dLen); |
321 | |
322 | return; |
323 | } |
324 | |
325 | /* |
326 | * Called by the archiver when a dumper's 'DataDumper' routine has |
327 | * finished. |
328 | * |
329 | * Optional. |
330 | * |
331 | */ |
332 | static void |
333 | _EndData(ArchiveHandle *AH, TocEntry *te) |
334 | { |
335 | lclContext *ctx = (lclContext *) AH->formatData; |
336 | |
337 | EndCompressor(AH, ctx->cs); |
338 | /* Send the end marker */ |
339 | WriteInt(AH, 0); |
340 | } |
341 | |
342 | /* |
343 | * Called by the archiver when starting to save all BLOB DATA (not schema). |
344 | * This routine should save whatever format-specific information is needed |
345 | * to read the BLOBs back into memory. |
346 | * |
347 | * It is called just prior to the dumper's DataDumper routine. |
348 | * |
349 | * Optional, but strongly recommended. |
350 | */ |
351 | static void |
352 | _StartBlobs(ArchiveHandle *AH, TocEntry *te) |
353 | { |
354 | lclContext *ctx = (lclContext *) AH->formatData; |
355 | lclTocEntry *tctx = (lclTocEntry *) te->formatData; |
356 | |
357 | tctx->dataPos = _getFilePos(AH, ctx); |
358 | tctx->dataState = K_OFFSET_POS_SET; |
359 | |
360 | _WriteByte(AH, BLK_BLOBS); /* Block type */ |
361 | WriteInt(AH, te->dumpId); /* For sanity check */ |
362 | } |
363 | |
364 | /* |
365 | * Called by the archiver when the dumper calls StartBlob. |
366 | * |
367 | * Mandatory. |
368 | * |
369 | * Must save the passed OID for retrieval at restore-time. |
370 | */ |
371 | static void |
372 | _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) |
373 | { |
374 | lclContext *ctx = (lclContext *) AH->formatData; |
375 | |
376 | if (oid == 0) |
377 | fatal("invalid OID for large object" ); |
378 | |
379 | WriteInt(AH, oid); |
380 | |
381 | ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); |
382 | } |
383 | |
384 | /* |
385 | * Called by the archiver when the dumper calls EndBlob. |
386 | * |
387 | * Optional. |
388 | */ |
389 | static void |
390 | _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) |
391 | { |
392 | lclContext *ctx = (lclContext *) AH->formatData; |
393 | |
394 | EndCompressor(AH, ctx->cs); |
395 | /* Send the end marker */ |
396 | WriteInt(AH, 0); |
397 | } |
398 | |
399 | /* |
400 | * Called by the archiver when finishing saving all BLOB DATA. |
401 | * |
402 | * Optional. |
403 | */ |
404 | static void |
405 | _EndBlobs(ArchiveHandle *AH, TocEntry *te) |
406 | { |
407 | /* Write out a fake zero OID to mark end-of-blobs. */ |
408 | WriteInt(AH, 0); |
409 | } |
410 | |
411 | /* |
412 | * Print data for a given TOC entry |
413 | */ |
414 | static void |
415 | _PrintTocData(ArchiveHandle *AH, TocEntry *te) |
416 | { |
417 | lclContext *ctx = (lclContext *) AH->formatData; |
418 | lclTocEntry *tctx = (lclTocEntry *) te->formatData; |
419 | int blkType; |
420 | int id; |
421 | |
422 | if (tctx->dataState == K_OFFSET_NO_DATA) |
423 | return; |
424 | |
425 | if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET) |
426 | { |
427 | /* |
428 | * We cannot seek directly to the desired block. Instead, skip over |
429 | * block headers until we find the one we want. This could fail if we |
430 | * are asked to restore items out-of-order. |
431 | */ |
432 | _readBlockHeader(AH, &blkType, &id); |
433 | |
434 | while (blkType != EOF && id != te->dumpId) |
435 | { |
436 | switch (blkType) |
437 | { |
438 | case BLK_DATA: |
439 | _skipData(AH); |
440 | break; |
441 | |
442 | case BLK_BLOBS: |
443 | _skipBlobs(AH); |
444 | break; |
445 | |
446 | default: /* Always have a default */ |
447 | fatal("unrecognized data block type (%d) while searching archive" , |
448 | blkType); |
449 | break; |
450 | } |
451 | _readBlockHeader(AH, &blkType, &id); |
452 | } |
453 | } |
454 | else |
455 | { |
456 | /* We can just seek to the place we need to be. */ |
457 | if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0) |
458 | fatal("error during file seek: %m" ); |
459 | |
460 | _readBlockHeader(AH, &blkType, &id); |
461 | } |
462 | |
463 | /* Produce suitable failure message if we fell off end of file */ |
464 | if (blkType == EOF) |
465 | { |
466 | if (tctx->dataState == K_OFFSET_POS_NOT_SET) |
467 | fatal("could not find block ID %d in archive -- " |
468 | "possibly due to out-of-order restore request, " |
469 | "which cannot be handled due to lack of data offsets in archive" , |
470 | te->dumpId); |
471 | else if (!ctx->hasSeek) |
472 | fatal("could not find block ID %d in archive -- " |
473 | "possibly due to out-of-order restore request, " |
474 | "which cannot be handled due to non-seekable input file" , |
475 | te->dumpId); |
476 | else /* huh, the dataPos led us to EOF? */ |
477 | fatal("could not find block ID %d in archive -- " |
478 | "possibly corrupt archive" , |
479 | te->dumpId); |
480 | } |
481 | |
482 | /* Are we sane? */ |
483 | if (id != te->dumpId) |
484 | fatal("found unexpected block ID (%d) when reading data -- expected %d" , |
485 | id, te->dumpId); |
486 | |
487 | switch (blkType) |
488 | { |
489 | case BLK_DATA: |
490 | _PrintData(AH); |
491 | break; |
492 | |
493 | case BLK_BLOBS: |
494 | _LoadBlobs(AH, AH->public.ropt->dropSchema); |
495 | break; |
496 | |
497 | default: /* Always have a default */ |
498 | fatal("unrecognized data block type %d while restoring archive" , |
499 | blkType); |
500 | break; |
501 | } |
502 | } |
503 | |
504 | /* |
505 | * Print data from current file position. |
506 | */ |
507 | static void |
508 | _PrintData(ArchiveHandle *AH) |
509 | { |
510 | ReadDataFromArchive(AH, AH->compression, _CustomReadFunc); |
511 | } |
512 | |
513 | static void |
514 | _LoadBlobs(ArchiveHandle *AH, bool drop) |
515 | { |
516 | Oid oid; |
517 | |
518 | StartRestoreBlobs(AH); |
519 | |
520 | oid = ReadInt(AH); |
521 | while (oid != 0) |
522 | { |
523 | StartRestoreBlob(AH, oid, drop); |
524 | _PrintData(AH); |
525 | EndRestoreBlob(AH, oid); |
526 | oid = ReadInt(AH); |
527 | } |
528 | |
529 | EndRestoreBlobs(AH); |
530 | } |
531 | |
532 | /* |
533 | * Skip the BLOBs from the current file position. |
534 | * BLOBS are written sequentially as data blocks (see below). |
535 | * Each BLOB is preceded by it's original OID. |
536 | * A zero OID indicated the end of the BLOBS |
537 | */ |
538 | static void |
539 | _skipBlobs(ArchiveHandle *AH) |
540 | { |
541 | Oid oid; |
542 | |
543 | oid = ReadInt(AH); |
544 | while (oid != 0) |
545 | { |
546 | _skipData(AH); |
547 | oid = ReadInt(AH); |
548 | } |
549 | } |
550 | |
551 | /* |
552 | * Skip data from current file position. |
553 | * Data blocks are formatted as an integer length, followed by data. |
554 | * A zero length denoted the end of the block. |
555 | */ |
556 | static void |
557 | _skipData(ArchiveHandle *AH) |
558 | { |
559 | lclContext *ctx = (lclContext *) AH->formatData; |
560 | size_t blkLen; |
561 | char *buf = NULL; |
562 | int buflen = 0; |
563 | size_t cnt; |
564 | |
565 | blkLen = ReadInt(AH); |
566 | while (blkLen != 0) |
567 | { |
568 | if (blkLen > buflen) |
569 | { |
570 | if (buf) |
571 | free(buf); |
572 | buf = (char *) pg_malloc(blkLen); |
573 | buflen = blkLen; |
574 | } |
575 | if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen) |
576 | { |
577 | if (feof(AH->FH)) |
578 | fatal("could not read from input file: end of file" ); |
579 | else |
580 | fatal("could not read from input file: %m" ); |
581 | } |
582 | |
583 | ctx->filePos += blkLen; |
584 | |
585 | blkLen = ReadInt(AH); |
586 | } |
587 | |
588 | if (buf) |
589 | free(buf); |
590 | } |
591 | |
592 | /* |
593 | * Write a byte of data to the archive. |
594 | * |
595 | * Mandatory. |
596 | * |
597 | * Called by the archiver to do integer & byte output to the archive. |
598 | */ |
599 | static int |
600 | _WriteByte(ArchiveHandle *AH, const int i) |
601 | { |
602 | lclContext *ctx = (lclContext *) AH->formatData; |
603 | int res; |
604 | |
605 | if ((res = fputc(i, AH->FH)) == EOF) |
606 | WRITE_ERROR_EXIT; |
607 | ctx->filePos += 1; |
608 | |
609 | return 1; |
610 | } |
611 | |
612 | /* |
613 | * Read a byte of data from the archive. |
614 | * |
615 | * Mandatory |
616 | * |
617 | * Called by the archiver to read bytes & integers from the archive. |
618 | * EOF should be treated as a fatal error. |
619 | */ |
620 | static int |
621 | _ReadByte(ArchiveHandle *AH) |
622 | { |
623 | lclContext *ctx = (lclContext *) AH->formatData; |
624 | int res; |
625 | |
626 | res = getc(AH->FH); |
627 | if (res == EOF) |
628 | READ_ERROR_EXIT(AH->FH); |
629 | ctx->filePos += 1; |
630 | return res; |
631 | } |
632 | |
633 | /* |
634 | * Write a buffer of data to the archive. |
635 | * |
636 | * Mandatory. |
637 | * |
638 | * Called by the archiver to write a block of bytes to the archive. |
639 | */ |
640 | static void |
641 | _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len) |
642 | { |
643 | lclContext *ctx = (lclContext *) AH->formatData; |
644 | |
645 | if (fwrite(buf, 1, len, AH->FH) != len) |
646 | WRITE_ERROR_EXIT; |
647 | ctx->filePos += len; |
648 | |
649 | return; |
650 | } |
651 | |
652 | /* |
653 | * Read a block of bytes from the archive. |
654 | * |
655 | * Mandatory. |
656 | * |
657 | * Called by the archiver to read a block of bytes from the archive |
658 | */ |
659 | static void |
660 | _ReadBuf(ArchiveHandle *AH, void *buf, size_t len) |
661 | { |
662 | lclContext *ctx = (lclContext *) AH->formatData; |
663 | |
664 | if (fread(buf, 1, len, AH->FH) != len) |
665 | READ_ERROR_EXIT(AH->FH); |
666 | ctx->filePos += len; |
667 | |
668 | return; |
669 | } |
670 | |
671 | /* |
672 | * Close the archive. |
673 | * |
674 | * Mandatory. |
675 | * |
676 | * When writing the archive, this is the routine that actually starts |
677 | * the process of saving it to files. No data should be written prior |
678 | * to this point, since the user could sort the TOC after creating it. |
679 | * |
680 | * If an archive is to be written, this routine must call: |
681 | * WriteHead to save the archive header |
682 | * WriteToc to save the TOC entries |
683 | * WriteDataChunks to save all DATA & BLOBs. |
684 | * |
685 | */ |
686 | static void |
687 | _CloseArchive(ArchiveHandle *AH) |
688 | { |
689 | lclContext *ctx = (lclContext *) AH->formatData; |
690 | pgoff_t tpos; |
691 | |
692 | if (AH->mode == archModeWrite) |
693 | { |
694 | WriteHead(AH); |
695 | /* Remember TOC's seek position for use below */ |
696 | tpos = ftello(AH->FH); |
697 | if (tpos < 0 && ctx->hasSeek) |
698 | fatal("could not determine seek position in archive file: %m" ); |
699 | WriteToc(AH); |
700 | ctx->dataStart = _getFilePos(AH, ctx); |
701 | WriteDataChunks(AH, NULL); |
702 | |
703 | /* |
704 | * If possible, re-write the TOC in order to update the data offset |
705 | * information. This is not essential, as pg_restore can cope in most |
706 | * cases without it; but it can make pg_restore significantly faster |
707 | * in some situations (especially parallel restore). |
708 | */ |
709 | if (ctx->hasSeek && |
710 | fseeko(AH->FH, tpos, SEEK_SET) == 0) |
711 | WriteToc(AH); |
712 | } |
713 | |
714 | if (fclose(AH->FH) != 0) |
715 | fatal("could not close archive file: %m" ); |
716 | |
717 | /* Sync the output file if one is defined */ |
718 | if (AH->dosync && AH->mode == archModeWrite && AH->fSpec) |
719 | (void) fsync_fname(AH->fSpec, false); |
720 | |
721 | AH->FH = NULL; |
722 | } |
723 | |
724 | /* |
725 | * Reopen the archive's file handle. |
726 | * |
727 | * We close the original file handle, except on Windows. (The difference |
728 | * is because on Windows, this is used within a multithreading context, |
729 | * and we don't want a thread closing the parent file handle.) |
730 | */ |
731 | static void |
732 | _ReopenArchive(ArchiveHandle *AH) |
733 | { |
734 | lclContext *ctx = (lclContext *) AH->formatData; |
735 | pgoff_t tpos; |
736 | |
737 | if (AH->mode == archModeWrite) |
738 | fatal("can only reopen input archives" ); |
739 | |
740 | /* |
741 | * These two cases are user-facing errors since they represent unsupported |
742 | * (but not invalid) use-cases. Word the error messages appropriately. |
743 | */ |
744 | if (AH->fSpec == NULL || strcmp(AH->fSpec, "" ) == 0) |
745 | fatal("parallel restore from standard input is not supported" ); |
746 | if (!ctx->hasSeek) |
747 | fatal("parallel restore from non-seekable file is not supported" ); |
748 | |
749 | tpos = ftello(AH->FH); |
750 | if (tpos < 0) |
751 | fatal("could not determine seek position in archive file: %m" ); |
752 | |
753 | #ifndef WIN32 |
754 | if (fclose(AH->FH) != 0) |
755 | fatal("could not close archive file: %m" ); |
756 | #endif |
757 | |
758 | AH->FH = fopen(AH->fSpec, PG_BINARY_R); |
759 | if (!AH->FH) |
760 | fatal("could not open input file \"%s\": %m" , AH->fSpec); |
761 | |
762 | if (fseeko(AH->FH, tpos, SEEK_SET) != 0) |
763 | fatal("could not set seek position in archive file: %m" ); |
764 | } |
765 | |
766 | /* |
767 | * Prepare for parallel restore. |
768 | * |
769 | * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS |
770 | * TOC entries' dataLength fields with appropriate values to guide the |
771 | * ordering of restore jobs. The source of said data is format-dependent, |
772 | * as is the exact meaning of the values. |
773 | * |
774 | * A format module might also choose to do other setup here. |
775 | */ |
776 | static void |
777 | _PrepParallelRestore(ArchiveHandle *AH) |
778 | { |
779 | lclContext *ctx = (lclContext *) AH->formatData; |
780 | TocEntry *prev_te = NULL; |
781 | lclTocEntry *prev_tctx = NULL; |
782 | TocEntry *te; |
783 | |
784 | /* |
785 | * Knowing that the data items were dumped out in TOC order, we can |
786 | * reconstruct the length of each item as the delta to the start offset of |
787 | * the next data item. |
788 | */ |
789 | for (te = AH->toc->next; te != AH->toc; te = te->next) |
790 | { |
791 | lclTocEntry *tctx = (lclTocEntry *) te->formatData; |
792 | |
793 | /* |
794 | * Ignore entries without a known data offset; if we were unable to |
795 | * seek to rewrite the TOC when creating the archive, this'll be all |
796 | * of them, and we'll end up with no size estimates. |
797 | */ |
798 | if (tctx->dataState != K_OFFSET_POS_SET) |
799 | continue; |
800 | |
801 | /* Compute previous data item's length */ |
802 | if (prev_te) |
803 | { |
804 | if (tctx->dataPos > prev_tctx->dataPos) |
805 | prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos; |
806 | } |
807 | |
808 | prev_te = te; |
809 | prev_tctx = tctx; |
810 | } |
811 | |
812 | /* If OK to seek, we can determine the length of the last item */ |
813 | if (prev_te && ctx->hasSeek) |
814 | { |
815 | pgoff_t endpos; |
816 | |
817 | if (fseeko(AH->FH, 0, SEEK_END) != 0) |
818 | fatal("error during file seek: %m" ); |
819 | endpos = ftello(AH->FH); |
820 | if (endpos > prev_tctx->dataPos) |
821 | prev_te->dataLength = endpos - prev_tctx->dataPos; |
822 | } |
823 | } |
824 | |
825 | /* |
826 | * Clone format-specific fields during parallel restoration. |
827 | */ |
828 | static void |
829 | _Clone(ArchiveHandle *AH) |
830 | { |
831 | lclContext *ctx = (lclContext *) AH->formatData; |
832 | |
833 | AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext)); |
834 | memcpy(AH->formatData, ctx, sizeof(lclContext)); |
835 | ctx = (lclContext *) AH->formatData; |
836 | |
837 | /* sanity check, shouldn't happen */ |
838 | if (ctx->cs != NULL) |
839 | fatal("compressor active" ); |
840 | |
841 | /* |
842 | * Note: we do not make a local lo_buf because we expect at most one BLOBS |
843 | * entry per archive, so no parallelism is possible. Likewise, |
844 | * TOC-entry-local state isn't an issue because any one TOC entry is |
845 | * touched by just one worker child. |
846 | */ |
847 | } |
848 | |
849 | static void |
850 | _DeClone(ArchiveHandle *AH) |
851 | { |
852 | lclContext *ctx = (lclContext *) AH->formatData; |
853 | |
854 | free(ctx); |
855 | } |
856 | |
857 | /* |
858 | * This function is executed in the child of a parallel restore from a |
859 | * custom-format archive and restores the actual data for one TOC entry. |
860 | */ |
861 | static int |
862 | _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) |
863 | { |
864 | return parallel_restore(AH, te); |
865 | } |
866 | |
867 | /*-------------------------------------------------- |
868 | * END OF FORMAT CALLBACKS |
869 | *-------------------------------------------------- |
870 | */ |
871 | |
872 | /* |
873 | * Get the current position in the archive file. |
874 | */ |
875 | static pgoff_t |
876 | _getFilePos(ArchiveHandle *AH, lclContext *ctx) |
877 | { |
878 | pgoff_t pos; |
879 | |
880 | if (ctx->hasSeek) |
881 | { |
882 | /* |
883 | * Prior to 1.7 (pg7.3) we relied on the internally maintained |
884 | * pointer. Now we rely on ftello() always, unless the file has been |
885 | * found to not support it. For debugging purposes, print a warning |
886 | * if the internal pointer disagrees, so that we're more likely to |
887 | * notice if something's broken about the internal position tracking. |
888 | */ |
889 | pos = ftello(AH->FH); |
890 | if (pos < 0) |
891 | fatal("could not determine seek position in archive file: %m" ); |
892 | |
893 | if (pos != ctx->filePos) |
894 | pg_log_warning("ftell mismatch with expected position -- ftell used" ); |
895 | } |
896 | else |
897 | pos = ctx->filePos; |
898 | return pos; |
899 | } |
900 | |
901 | /* |
902 | * Read a data block header. The format changed in V1.3, so we |
903 | * centralize the code here for simplicity. Returns *type = EOF |
904 | * if at EOF. |
905 | */ |
906 | static void |
907 | (ArchiveHandle *AH, int *type, int *id) |
908 | { |
909 | lclContext *ctx = (lclContext *) AH->formatData; |
910 | int byt; |
911 | |
912 | /* |
913 | * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside |
914 | * ReadInt rather than returning EOF. It doesn't seem worth jumping |
915 | * through hoops to deal with that case better, because no such files are |
916 | * likely to exist in the wild: only some 7.1 development versions of |
917 | * pg_dump ever generated such files. |
918 | */ |
919 | if (AH->version < K_VERS_1_3) |
920 | *type = BLK_DATA; |
921 | else |
922 | { |
923 | byt = getc(AH->FH); |
924 | *type = byt; |
925 | if (byt == EOF) |
926 | { |
927 | *id = 0; /* don't return an uninitialized value */ |
928 | return; |
929 | } |
930 | ctx->filePos += 1; |
931 | } |
932 | |
933 | *id = ReadInt(AH); |
934 | } |
935 | |
936 | /* |
937 | * Callback function for WriteDataToArchive. Writes one block of (compressed) |
938 | * data to the archive. |
939 | */ |
940 | static void |
941 | _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len) |
942 | { |
943 | /* never write 0-byte blocks (this should not happen) */ |
944 | if (len > 0) |
945 | { |
946 | WriteInt(AH, len); |
947 | _WriteBuf(AH, buf, len); |
948 | } |
949 | return; |
950 | } |
951 | |
952 | /* |
953 | * Callback function for ReadDataFromArchive. To keep things simple, we |
954 | * always read one compressed block at a time. |
955 | */ |
956 | static size_t |
957 | _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen) |
958 | { |
959 | size_t blkLen; |
960 | |
961 | /* Read length */ |
962 | blkLen = ReadInt(AH); |
963 | if (blkLen == 0) |
964 | return 0; |
965 | |
966 | /* If the caller's buffer is not large enough, allocate a bigger one */ |
967 | if (blkLen > *buflen) |
968 | { |
969 | free(*buf); |
970 | *buf = (char *) pg_malloc(blkLen); |
971 | *buflen = blkLen; |
972 | } |
973 | |
974 | /* exits app on read errors */ |
975 | _ReadBuf(AH, *buf, blkLen); |
976 | |
977 | return blkLen; |
978 | } |
979 | |