1/*-------------------------------------------------------------------------
2 *
3 * pg_backup_custom.c
4 *
5 * Implements the custom output format.
6 *
7 * The comments with the routines in this code are a good place to
8 * understand how to write a new format.
9 *
10 * See the headers to pg_restore for more details.
11 *
12 * Copyright (c) 2000, Philip Warner
13 * Rights are granted to use this software in any way so long
14 * as this notice is not removed.
15 *
16 * The author is not responsible for loss or damages that may
17 * and any liability will be limited to the time taken to fix any
18 * related bug.
19 *
20 *
21 * IDENTIFICATION
22 * src/bin/pg_dump/pg_backup_custom.c
23 *
24 *-------------------------------------------------------------------------
25 */
26#include "postgres_fe.h"
27
28#include "compress_io.h"
29#include "parallel.h"
30#include "pg_backup_utils.h"
31#include "common/file_utils.h"
32
33
34/*--------
35 * Routines in the format interface
36 *--------
37 */
38
39static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
40static void _StartData(ArchiveHandle *AH, TocEntry *te);
41static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
42static void _EndData(ArchiveHandle *AH, TocEntry *te);
43static int _WriteByte(ArchiveHandle *AH, const int i);
44static int _ReadByte(ArchiveHandle *);
45static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
46static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
47static void _CloseArchive(ArchiveHandle *AH);
48static void _ReopenArchive(ArchiveHandle *AH);
49static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
50static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
51static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
52static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
53
54static void _PrintData(ArchiveHandle *AH);
55static void _skipData(ArchiveHandle *AH);
56static void _skipBlobs(ArchiveHandle *AH);
57
58static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
59static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
61static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
62static void _LoadBlobs(ArchiveHandle *AH, bool drop);
63
64static void _PrepParallelRestore(ArchiveHandle *AH);
65static void _Clone(ArchiveHandle *AH);
66static void _DeClone(ArchiveHandle *AH);
67
68static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
69
70typedef struct
71{
72 CompressorState *cs;
73 int hasSeek;
74 pgoff_t filePos;
75 pgoff_t dataStart;
76} lclContext;
77
78typedef struct
79{
80 int dataState;
81 pgoff_t dataPos;
82} lclTocEntry;
83
84
85/*------
86 * Static declarations
87 *------
88 */
89static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
90static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
91
92static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
93static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
94
95
96/*
97 * Init routine required by ALL formats. This is a global routine
98 * and should be declared in pg_backup_archiver.h
99 *
100 * It's task is to create any extra archive context (using AH->formatData),
101 * and to initialize the supported function pointers.
102 *
103 * It should also prepare whatever it's input source is for reading/writing,
104 * and in the case of a read mode connection, it should load the Header & TOC.
105 */
106void
107InitArchiveFmt_Custom(ArchiveHandle *AH)
108{
109 lclContext *ctx;
110
111 /* Assuming static functions, this can be copied for each format. */
112 AH->ArchiveEntryPtr = _ArchiveEntry;
113 AH->StartDataPtr = _StartData;
114 AH->WriteDataPtr = _WriteData;
115 AH->EndDataPtr = _EndData;
116 AH->WriteBytePtr = _WriteByte;
117 AH->ReadBytePtr = _ReadByte;
118 AH->WriteBufPtr = _WriteBuf;
119 AH->ReadBufPtr = _ReadBuf;
120 AH->ClosePtr = _CloseArchive;
121 AH->ReopenPtr = _ReopenArchive;
122 AH->PrintTocDataPtr = _PrintTocData;
123 AH->ReadExtraTocPtr = _ReadExtraToc;
124 AH->WriteExtraTocPtr = _WriteExtraToc;
125 AH->PrintExtraTocPtr = _PrintExtraToc;
126
127 AH->StartBlobsPtr = _StartBlobs;
128 AH->StartBlobPtr = _StartBlob;
129 AH->EndBlobPtr = _EndBlob;
130 AH->EndBlobsPtr = _EndBlobs;
131
132 AH->PrepParallelRestorePtr = _PrepParallelRestore;
133 AH->ClonePtr = _Clone;
134 AH->DeClonePtr = _DeClone;
135
136 /* no parallel dump in the custom archive, only parallel restore */
137 AH->WorkerJobDumpPtr = NULL;
138 AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
139
140 /* Set up a private area. */
141 ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
142 AH->formatData = (void *) ctx;
143
144 /* Initialize LO buffering */
145 AH->lo_buf_size = LOBBUFSIZE;
146 AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
147
148 ctx->filePos = 0;
149
150 /*
151 * Now open the file
152 */
153 if (AH->mode == archModeWrite)
154 {
155 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
156 {
157 AH->FH = fopen(AH->fSpec, PG_BINARY_W);
158 if (!AH->FH)
159 fatal("could not open output file \"%s\": %m", AH->fSpec);
160 }
161 else
162 {
163 AH->FH = stdout;
164 if (!AH->FH)
165 fatal("could not open output file: %m");
166 }
167
168 ctx->hasSeek = checkSeek(AH->FH);
169 }
170 else
171 {
172 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
173 {
174 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
175 if (!AH->FH)
176 fatal("could not open input file \"%s\": %m", AH->fSpec);
177 }
178 else
179 {
180 AH->FH = stdin;
181 if (!AH->FH)
182 fatal("could not open input file: %m");
183 }
184
185 ctx->hasSeek = checkSeek(AH->FH);
186
187 ReadHead(AH);
188 ReadToc(AH);
189 ctx->dataStart = _getFilePos(AH, ctx);
190 }
191
192}
193
194/*
195 * Called by the Archiver when the dumper creates a new TOC entry.
196 *
197 * Optional.
198 *
199 * Set up extract format-related TOC data.
200*/
201static void
202_ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
203{
204 lclTocEntry *ctx;
205
206 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
207 if (te->dataDumper)
208 ctx->dataState = K_OFFSET_POS_NOT_SET;
209 else
210 ctx->dataState = K_OFFSET_NO_DATA;
211
212 te->formatData = (void *) ctx;
213}
214
215/*
216 * Called by the Archiver to save any extra format-related TOC entry
217 * data.
218 *
219 * Optional.
220 *
221 * Use the Archiver routines to write data - they are non-endian, and
222 * maintain other important file information.
223 */
224static void
225_WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
226{
227 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
228
229 WriteOffset(AH, ctx->dataPos, ctx->dataState);
230}
231
232/*
233 * Called by the Archiver to read any extra format-related TOC data.
234 *
235 * Optional.
236 *
237 * Needs to match the order defined in _WriteExtraToc, and should also
238 * use the Archiver input routines.
239 */
240static void
241_ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
242{
243 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
244
245 if (ctx == NULL)
246 {
247 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
248 te->formatData = (void *) ctx;
249 }
250
251 ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
252
253 /*
254 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
255 * dump it at all.
256 */
257 if (AH->version < K_VERS_1_7)
258 ReadInt(AH);
259}
260
261/*
262 * Called by the Archiver when restoring an archive to output a comment
263 * that includes useful information about the TOC entry.
264 *
265 * Optional.
266 *
267 */
268static void
269_PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
270{
271 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
272
273 if (AH->public.verbose)
274 ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
275 (int64) ctx->dataPos);
276}
277
278/*
279 * Called by the archiver when saving TABLE DATA (not schema). This routine
280 * should save whatever format-specific information is needed to read
281 * the archive back.
282 *
283 * It is called just prior to the dumper's 'DataDumper' routine being called.
284 *
285 * Optional, but strongly recommended.
286 *
287 */
288static void
289_StartData(ArchiveHandle *AH, TocEntry *te)
290{
291 lclContext *ctx = (lclContext *) AH->formatData;
292 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
293
294 tctx->dataPos = _getFilePos(AH, ctx);
295 tctx->dataState = K_OFFSET_POS_SET;
296
297 _WriteByte(AH, BLK_DATA); /* Block type */
298 WriteInt(AH, te->dumpId); /* For sanity check */
299
300 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
301}
302
303/*
304 * Called by archiver when dumper calls WriteData. This routine is
305 * called for both BLOB and TABLE data; it is the responsibility of
306 * the format to manage each kind of data using StartBlob/StartData.
307 *
308 * It should only be called from within a DataDumper routine.
309 *
310 * Mandatory.
311 */
312static void
313_WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
314{
315 lclContext *ctx = (lclContext *) AH->formatData;
316 CompressorState *cs = ctx->cs;
317
318 if (dLen > 0)
319 /* WriteDataToArchive() internally throws write errors */
320 WriteDataToArchive(AH, cs, data, dLen);
321
322 return;
323}
324
325/*
326 * Called by the archiver when a dumper's 'DataDumper' routine has
327 * finished.
328 *
329 * Optional.
330 *
331 */
332static void
333_EndData(ArchiveHandle *AH, TocEntry *te)
334{
335 lclContext *ctx = (lclContext *) AH->formatData;
336
337 EndCompressor(AH, ctx->cs);
338 /* Send the end marker */
339 WriteInt(AH, 0);
340}
341
342/*
343 * Called by the archiver when starting to save all BLOB DATA (not schema).
344 * This routine should save whatever format-specific information is needed
345 * to read the BLOBs back into memory.
346 *
347 * It is called just prior to the dumper's DataDumper routine.
348 *
349 * Optional, but strongly recommended.
350 */
351static void
352_StartBlobs(ArchiveHandle *AH, TocEntry *te)
353{
354 lclContext *ctx = (lclContext *) AH->formatData;
355 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
356
357 tctx->dataPos = _getFilePos(AH, ctx);
358 tctx->dataState = K_OFFSET_POS_SET;
359
360 _WriteByte(AH, BLK_BLOBS); /* Block type */
361 WriteInt(AH, te->dumpId); /* For sanity check */
362}
363
364/*
365 * Called by the archiver when the dumper calls StartBlob.
366 *
367 * Mandatory.
368 *
369 * Must save the passed OID for retrieval at restore-time.
370 */
371static void
372_StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
373{
374 lclContext *ctx = (lclContext *) AH->formatData;
375
376 if (oid == 0)
377 fatal("invalid OID for large object");
378
379 WriteInt(AH, oid);
380
381 ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
382}
383
384/*
385 * Called by the archiver when the dumper calls EndBlob.
386 *
387 * Optional.
388 */
389static void
390_EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
391{
392 lclContext *ctx = (lclContext *) AH->formatData;
393
394 EndCompressor(AH, ctx->cs);
395 /* Send the end marker */
396 WriteInt(AH, 0);
397}
398
399/*
400 * Called by the archiver when finishing saving all BLOB DATA.
401 *
402 * Optional.
403 */
404static void
405_EndBlobs(ArchiveHandle *AH, TocEntry *te)
406{
407 /* Write out a fake zero OID to mark end-of-blobs. */
408 WriteInt(AH, 0);
409}
410
411/*
412 * Print data for a given TOC entry
413 */
414static void
415_PrintTocData(ArchiveHandle *AH, TocEntry *te)
416{
417 lclContext *ctx = (lclContext *) AH->formatData;
418 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
419 int blkType;
420 int id;
421
422 if (tctx->dataState == K_OFFSET_NO_DATA)
423 return;
424
425 if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
426 {
427 /*
428 * We cannot seek directly to the desired block. Instead, skip over
429 * block headers until we find the one we want. This could fail if we
430 * are asked to restore items out-of-order.
431 */
432 _readBlockHeader(AH, &blkType, &id);
433
434 while (blkType != EOF && id != te->dumpId)
435 {
436 switch (blkType)
437 {
438 case BLK_DATA:
439 _skipData(AH);
440 break;
441
442 case BLK_BLOBS:
443 _skipBlobs(AH);
444 break;
445
446 default: /* Always have a default */
447 fatal("unrecognized data block type (%d) while searching archive",
448 blkType);
449 break;
450 }
451 _readBlockHeader(AH, &blkType, &id);
452 }
453 }
454 else
455 {
456 /* We can just seek to the place we need to be. */
457 if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
458 fatal("error during file seek: %m");
459
460 _readBlockHeader(AH, &blkType, &id);
461 }
462
463 /* Produce suitable failure message if we fell off end of file */
464 if (blkType == EOF)
465 {
466 if (tctx->dataState == K_OFFSET_POS_NOT_SET)
467 fatal("could not find block ID %d in archive -- "
468 "possibly due to out-of-order restore request, "
469 "which cannot be handled due to lack of data offsets in archive",
470 te->dumpId);
471 else if (!ctx->hasSeek)
472 fatal("could not find block ID %d in archive -- "
473 "possibly due to out-of-order restore request, "
474 "which cannot be handled due to non-seekable input file",
475 te->dumpId);
476 else /* huh, the dataPos led us to EOF? */
477 fatal("could not find block ID %d in archive -- "
478 "possibly corrupt archive",
479 te->dumpId);
480 }
481
482 /* Are we sane? */
483 if (id != te->dumpId)
484 fatal("found unexpected block ID (%d) when reading data -- expected %d",
485 id, te->dumpId);
486
487 switch (blkType)
488 {
489 case BLK_DATA:
490 _PrintData(AH);
491 break;
492
493 case BLK_BLOBS:
494 _LoadBlobs(AH, AH->public.ropt->dropSchema);
495 break;
496
497 default: /* Always have a default */
498 fatal("unrecognized data block type %d while restoring archive",
499 blkType);
500 break;
501 }
502}
503
504/*
505 * Print data from current file position.
506*/
507static void
508_PrintData(ArchiveHandle *AH)
509{
510 ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
511}
512
513static void
514_LoadBlobs(ArchiveHandle *AH, bool drop)
515{
516 Oid oid;
517
518 StartRestoreBlobs(AH);
519
520 oid = ReadInt(AH);
521 while (oid != 0)
522 {
523 StartRestoreBlob(AH, oid, drop);
524 _PrintData(AH);
525 EndRestoreBlob(AH, oid);
526 oid = ReadInt(AH);
527 }
528
529 EndRestoreBlobs(AH);
530}
531
532/*
533 * Skip the BLOBs from the current file position.
534 * BLOBS are written sequentially as data blocks (see below).
535 * Each BLOB is preceded by it's original OID.
536 * A zero OID indicated the end of the BLOBS
537 */
538static void
539_skipBlobs(ArchiveHandle *AH)
540{
541 Oid oid;
542
543 oid = ReadInt(AH);
544 while (oid != 0)
545 {
546 _skipData(AH);
547 oid = ReadInt(AH);
548 }
549}
550
551/*
552 * Skip data from current file position.
553 * Data blocks are formatted as an integer length, followed by data.
554 * A zero length denoted the end of the block.
555*/
556static void
557_skipData(ArchiveHandle *AH)
558{
559 lclContext *ctx = (lclContext *) AH->formatData;
560 size_t blkLen;
561 char *buf = NULL;
562 int buflen = 0;
563 size_t cnt;
564
565 blkLen = ReadInt(AH);
566 while (blkLen != 0)
567 {
568 if (blkLen > buflen)
569 {
570 if (buf)
571 free(buf);
572 buf = (char *) pg_malloc(blkLen);
573 buflen = blkLen;
574 }
575 if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
576 {
577 if (feof(AH->FH))
578 fatal("could not read from input file: end of file");
579 else
580 fatal("could not read from input file: %m");
581 }
582
583 ctx->filePos += blkLen;
584
585 blkLen = ReadInt(AH);
586 }
587
588 if (buf)
589 free(buf);
590}
591
592/*
593 * Write a byte of data to the archive.
594 *
595 * Mandatory.
596 *
597 * Called by the archiver to do integer & byte output to the archive.
598 */
599static int
600_WriteByte(ArchiveHandle *AH, const int i)
601{
602 lclContext *ctx = (lclContext *) AH->formatData;
603 int res;
604
605 if ((res = fputc(i, AH->FH)) == EOF)
606 WRITE_ERROR_EXIT;
607 ctx->filePos += 1;
608
609 return 1;
610}
611
612/*
613 * Read a byte of data from the archive.
614 *
615 * Mandatory
616 *
617 * Called by the archiver to read bytes & integers from the archive.
618 * EOF should be treated as a fatal error.
619 */
620static int
621_ReadByte(ArchiveHandle *AH)
622{
623 lclContext *ctx = (lclContext *) AH->formatData;
624 int res;
625
626 res = getc(AH->FH);
627 if (res == EOF)
628 READ_ERROR_EXIT(AH->FH);
629 ctx->filePos += 1;
630 return res;
631}
632
633/*
634 * Write a buffer of data to the archive.
635 *
636 * Mandatory.
637 *
638 * Called by the archiver to write a block of bytes to the archive.
639 */
640static void
641_WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
642{
643 lclContext *ctx = (lclContext *) AH->formatData;
644
645 if (fwrite(buf, 1, len, AH->FH) != len)
646 WRITE_ERROR_EXIT;
647 ctx->filePos += len;
648
649 return;
650}
651
652/*
653 * Read a block of bytes from the archive.
654 *
655 * Mandatory.
656 *
657 * Called by the archiver to read a block of bytes from the archive
658 */
659static void
660_ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
661{
662 lclContext *ctx = (lclContext *) AH->formatData;
663
664 if (fread(buf, 1, len, AH->FH) != len)
665 READ_ERROR_EXIT(AH->FH);
666 ctx->filePos += len;
667
668 return;
669}
670
671/*
672 * Close the archive.
673 *
674 * Mandatory.
675 *
676 * When writing the archive, this is the routine that actually starts
677 * the process of saving it to files. No data should be written prior
678 * to this point, since the user could sort the TOC after creating it.
679 *
680 * If an archive is to be written, this routine must call:
681 * WriteHead to save the archive header
682 * WriteToc to save the TOC entries
683 * WriteDataChunks to save all DATA & BLOBs.
684 *
685 */
686static void
687_CloseArchive(ArchiveHandle *AH)
688{
689 lclContext *ctx = (lclContext *) AH->formatData;
690 pgoff_t tpos;
691
692 if (AH->mode == archModeWrite)
693 {
694 WriteHead(AH);
695 /* Remember TOC's seek position for use below */
696 tpos = ftello(AH->FH);
697 if (tpos < 0 && ctx->hasSeek)
698 fatal("could not determine seek position in archive file: %m");
699 WriteToc(AH);
700 ctx->dataStart = _getFilePos(AH, ctx);
701 WriteDataChunks(AH, NULL);
702
703 /*
704 * If possible, re-write the TOC in order to update the data offset
705 * information. This is not essential, as pg_restore can cope in most
706 * cases without it; but it can make pg_restore significantly faster
707 * in some situations (especially parallel restore).
708 */
709 if (ctx->hasSeek &&
710 fseeko(AH->FH, tpos, SEEK_SET) == 0)
711 WriteToc(AH);
712 }
713
714 if (fclose(AH->FH) != 0)
715 fatal("could not close archive file: %m");
716
717 /* Sync the output file if one is defined */
718 if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
719 (void) fsync_fname(AH->fSpec, false);
720
721 AH->FH = NULL;
722}
723
724/*
725 * Reopen the archive's file handle.
726 *
727 * We close the original file handle, except on Windows. (The difference
728 * is because on Windows, this is used within a multithreading context,
729 * and we don't want a thread closing the parent file handle.)
730 */
731static void
732_ReopenArchive(ArchiveHandle *AH)
733{
734 lclContext *ctx = (lclContext *) AH->formatData;
735 pgoff_t tpos;
736
737 if (AH->mode == archModeWrite)
738 fatal("can only reopen input archives");
739
740 /*
741 * These two cases are user-facing errors since they represent unsupported
742 * (but not invalid) use-cases. Word the error messages appropriately.
743 */
744 if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
745 fatal("parallel restore from standard input is not supported");
746 if (!ctx->hasSeek)
747 fatal("parallel restore from non-seekable file is not supported");
748
749 tpos = ftello(AH->FH);
750 if (tpos < 0)
751 fatal("could not determine seek position in archive file: %m");
752
753#ifndef WIN32
754 if (fclose(AH->FH) != 0)
755 fatal("could not close archive file: %m");
756#endif
757
758 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
759 if (!AH->FH)
760 fatal("could not open input file \"%s\": %m", AH->fSpec);
761
762 if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
763 fatal("could not set seek position in archive file: %m");
764}
765
766/*
767 * Prepare for parallel restore.
768 *
769 * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
770 * TOC entries' dataLength fields with appropriate values to guide the
771 * ordering of restore jobs. The source of said data is format-dependent,
772 * as is the exact meaning of the values.
773 *
774 * A format module might also choose to do other setup here.
775 */
776static void
777_PrepParallelRestore(ArchiveHandle *AH)
778{
779 lclContext *ctx = (lclContext *) AH->formatData;
780 TocEntry *prev_te = NULL;
781 lclTocEntry *prev_tctx = NULL;
782 TocEntry *te;
783
784 /*
785 * Knowing that the data items were dumped out in TOC order, we can
786 * reconstruct the length of each item as the delta to the start offset of
787 * the next data item.
788 */
789 for (te = AH->toc->next; te != AH->toc; te = te->next)
790 {
791 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
792
793 /*
794 * Ignore entries without a known data offset; if we were unable to
795 * seek to rewrite the TOC when creating the archive, this'll be all
796 * of them, and we'll end up with no size estimates.
797 */
798 if (tctx->dataState != K_OFFSET_POS_SET)
799 continue;
800
801 /* Compute previous data item's length */
802 if (prev_te)
803 {
804 if (tctx->dataPos > prev_tctx->dataPos)
805 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
806 }
807
808 prev_te = te;
809 prev_tctx = tctx;
810 }
811
812 /* If OK to seek, we can determine the length of the last item */
813 if (prev_te && ctx->hasSeek)
814 {
815 pgoff_t endpos;
816
817 if (fseeko(AH->FH, 0, SEEK_END) != 0)
818 fatal("error during file seek: %m");
819 endpos = ftello(AH->FH);
820 if (endpos > prev_tctx->dataPos)
821 prev_te->dataLength = endpos - prev_tctx->dataPos;
822 }
823}
824
825/*
826 * Clone format-specific fields during parallel restoration.
827 */
828static void
829_Clone(ArchiveHandle *AH)
830{
831 lclContext *ctx = (lclContext *) AH->formatData;
832
833 AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
834 memcpy(AH->formatData, ctx, sizeof(lclContext));
835 ctx = (lclContext *) AH->formatData;
836
837 /* sanity check, shouldn't happen */
838 if (ctx->cs != NULL)
839 fatal("compressor active");
840
841 /*
842 * Note: we do not make a local lo_buf because we expect at most one BLOBS
843 * entry per archive, so no parallelism is possible. Likewise,
844 * TOC-entry-local state isn't an issue because any one TOC entry is
845 * touched by just one worker child.
846 */
847}
848
849static void
850_DeClone(ArchiveHandle *AH)
851{
852 lclContext *ctx = (lclContext *) AH->formatData;
853
854 free(ctx);
855}
856
857/*
858 * This function is executed in the child of a parallel restore from a
859 * custom-format archive and restores the actual data for one TOC entry.
860 */
861static int
862_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
863{
864 return parallel_restore(AH, te);
865}
866
867/*--------------------------------------------------
868 * END OF FORMAT CALLBACKS
869 *--------------------------------------------------
870 */
871
872/*
873 * Get the current position in the archive file.
874 */
875static pgoff_t
876_getFilePos(ArchiveHandle *AH, lclContext *ctx)
877{
878 pgoff_t pos;
879
880 if (ctx->hasSeek)
881 {
882 /*
883 * Prior to 1.7 (pg7.3) we relied on the internally maintained
884 * pointer. Now we rely on ftello() always, unless the file has been
885 * found to not support it. For debugging purposes, print a warning
886 * if the internal pointer disagrees, so that we're more likely to
887 * notice if something's broken about the internal position tracking.
888 */
889 pos = ftello(AH->FH);
890 if (pos < 0)
891 fatal("could not determine seek position in archive file: %m");
892
893 if (pos != ctx->filePos)
894 pg_log_warning("ftell mismatch with expected position -- ftell used");
895 }
896 else
897 pos = ctx->filePos;
898 return pos;
899}
900
901/*
902 * Read a data block header. The format changed in V1.3, so we
903 * centralize the code here for simplicity. Returns *type = EOF
904 * if at EOF.
905 */
906static void
907_readBlockHeader(ArchiveHandle *AH, int *type, int *id)
908{
909 lclContext *ctx = (lclContext *) AH->formatData;
910 int byt;
911
912 /*
913 * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
914 * ReadInt rather than returning EOF. It doesn't seem worth jumping
915 * through hoops to deal with that case better, because no such files are
916 * likely to exist in the wild: only some 7.1 development versions of
917 * pg_dump ever generated such files.
918 */
919 if (AH->version < K_VERS_1_3)
920 *type = BLK_DATA;
921 else
922 {
923 byt = getc(AH->FH);
924 *type = byt;
925 if (byt == EOF)
926 {
927 *id = 0; /* don't return an uninitialized value */
928 return;
929 }
930 ctx->filePos += 1;
931 }
932
933 *id = ReadInt(AH);
934}
935
936/*
937 * Callback function for WriteDataToArchive. Writes one block of (compressed)
938 * data to the archive.
939 */
940static void
941_CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
942{
943 /* never write 0-byte blocks (this should not happen) */
944 if (len > 0)
945 {
946 WriteInt(AH, len);
947 _WriteBuf(AH, buf, len);
948 }
949 return;
950}
951
952/*
953 * Callback function for ReadDataFromArchive. To keep things simple, we
954 * always read one compressed block at a time.
955 */
956static size_t
957_CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
958{
959 size_t blkLen;
960
961 /* Read length */
962 blkLen = ReadInt(AH);
963 if (blkLen == 0)
964 return 0;
965
966 /* If the caller's buffer is not large enough, allocate a bigger one */
967 if (blkLen > *buflen)
968 {
969 free(*buf);
970 *buf = (char *) pg_malloc(blkLen);
971 *buflen = blkLen;
972 }
973
974 /* exits app on read errors */
975 _ReadBuf(AH, *buf, blkLen);
976
977 return blkLen;
978}
979