| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * pg_backup_custom.c |
| 4 | * |
| 5 | * Implements the custom output format. |
| 6 | * |
| 7 | * The comments with the routines in this code are a good place to |
| 8 | * understand how to write a new format. |
| 9 | * |
| 10 | * See the headers to pg_restore for more details. |
| 11 | * |
| 12 | * Copyright (c) 2000, Philip Warner |
| 13 | * Rights are granted to use this software in any way so long |
| 14 | * as this notice is not removed. |
| 15 | * |
| 16 | * The author is not responsible for loss or damages that may |
| 17 | * and any liability will be limited to the time taken to fix any |
| 18 | * related bug. |
| 19 | * |
| 20 | * |
| 21 | * IDENTIFICATION |
| 22 | * src/bin/pg_dump/pg_backup_custom.c |
| 23 | * |
| 24 | *------------------------------------------------------------------------- |
| 25 | */ |
| 26 | #include "postgres_fe.h" |
| 27 | |
| 28 | #include "compress_io.h" |
| 29 | #include "parallel.h" |
| 30 | #include "pg_backup_utils.h" |
| 31 | #include "common/file_utils.h" |
| 32 | |
| 33 | |
| 34 | /*-------- |
| 35 | * Routines in the format interface |
| 36 | *-------- |
| 37 | */ |
| 38 | |
| 39 | static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te); |
| 40 | static void _StartData(ArchiveHandle *AH, TocEntry *te); |
| 41 | static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen); |
| 42 | static void _EndData(ArchiveHandle *AH, TocEntry *te); |
| 43 | static int _WriteByte(ArchiveHandle *AH, const int i); |
| 44 | static int _ReadByte(ArchiveHandle *); |
| 45 | static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); |
| 46 | static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); |
| 47 | static void _CloseArchive(ArchiveHandle *AH); |
| 48 | static void _ReopenArchive(ArchiveHandle *AH); |
| 49 | static void _PrintTocData(ArchiveHandle *AH, TocEntry *te); |
| 50 | static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); |
| 51 | static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); |
| 52 | static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te); |
| 53 | |
| 54 | static void _PrintData(ArchiveHandle *AH); |
| 55 | static void _skipData(ArchiveHandle *AH); |
| 56 | static void _skipBlobs(ArchiveHandle *AH); |
| 57 | |
| 58 | static void _StartBlobs(ArchiveHandle *AH, TocEntry *te); |
| 59 | static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); |
| 60 | static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); |
| 61 | static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); |
| 62 | static void _LoadBlobs(ArchiveHandle *AH, bool drop); |
| 63 | |
| 64 | static void _PrepParallelRestore(ArchiveHandle *AH); |
| 65 | static void _Clone(ArchiveHandle *AH); |
| 66 | static void _DeClone(ArchiveHandle *AH); |
| 67 | |
| 68 | static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te); |
| 69 | |
| 70 | typedef struct |
| 71 | { |
| 72 | CompressorState *cs; |
| 73 | int hasSeek; |
| 74 | pgoff_t filePos; |
| 75 | pgoff_t dataStart; |
| 76 | } lclContext; |
| 77 | |
| 78 | typedef struct |
| 79 | { |
| 80 | int dataState; |
| 81 | pgoff_t dataPos; |
| 82 | } lclTocEntry; |
| 83 | |
| 84 | |
| 85 | /*------ |
| 86 | * Static declarations |
| 87 | *------ |
| 88 | */ |
| 89 | static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id); |
| 90 | static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx); |
| 91 | |
| 92 | static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len); |
| 93 | static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen); |
| 94 | |
| 95 | |
| 96 | /* |
| 97 | * Init routine required by ALL formats. This is a global routine |
| 98 | * and should be declared in pg_backup_archiver.h |
| 99 | * |
| 100 | * It's task is to create any extra archive context (using AH->formatData), |
| 101 | * and to initialize the supported function pointers. |
| 102 | * |
| 103 | * It should also prepare whatever it's input source is for reading/writing, |
| 104 | * and in the case of a read mode connection, it should load the Header & TOC. |
| 105 | */ |
| 106 | void |
| 107 | InitArchiveFmt_Custom(ArchiveHandle *AH) |
| 108 | { |
| 109 | lclContext *ctx; |
| 110 | |
| 111 | /* Assuming static functions, this can be copied for each format. */ |
| 112 | AH->ArchiveEntryPtr = _ArchiveEntry; |
| 113 | AH->StartDataPtr = _StartData; |
| 114 | AH->WriteDataPtr = _WriteData; |
| 115 | AH->EndDataPtr = _EndData; |
| 116 | AH->WriteBytePtr = _WriteByte; |
| 117 | AH->ReadBytePtr = _ReadByte; |
| 118 | AH->WriteBufPtr = _WriteBuf; |
| 119 | AH->ReadBufPtr = _ReadBuf; |
| 120 | AH->ClosePtr = _CloseArchive; |
| 121 | AH->ReopenPtr = _ReopenArchive; |
| 122 | AH->PrintTocDataPtr = _PrintTocData; |
| 123 | AH->ReadExtraTocPtr = _ReadExtraToc; |
| 124 | AH->WriteExtraTocPtr = _WriteExtraToc; |
| 125 | AH->PrintExtraTocPtr = _PrintExtraToc; |
| 126 | |
| 127 | AH->StartBlobsPtr = _StartBlobs; |
| 128 | AH->StartBlobPtr = _StartBlob; |
| 129 | AH->EndBlobPtr = _EndBlob; |
| 130 | AH->EndBlobsPtr = _EndBlobs; |
| 131 | |
| 132 | AH->PrepParallelRestorePtr = _PrepParallelRestore; |
| 133 | AH->ClonePtr = _Clone; |
| 134 | AH->DeClonePtr = _DeClone; |
| 135 | |
| 136 | /* no parallel dump in the custom archive, only parallel restore */ |
| 137 | AH->WorkerJobDumpPtr = NULL; |
| 138 | AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom; |
| 139 | |
| 140 | /* Set up a private area. */ |
| 141 | ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); |
| 142 | AH->formatData = (void *) ctx; |
| 143 | |
| 144 | /* Initialize LO buffering */ |
| 145 | AH->lo_buf_size = LOBBUFSIZE; |
| 146 | AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE); |
| 147 | |
| 148 | ctx->filePos = 0; |
| 149 | |
| 150 | /* |
| 151 | * Now open the file |
| 152 | */ |
| 153 | if (AH->mode == archModeWrite) |
| 154 | { |
| 155 | if (AH->fSpec && strcmp(AH->fSpec, "" ) != 0) |
| 156 | { |
| 157 | AH->FH = fopen(AH->fSpec, PG_BINARY_W); |
| 158 | if (!AH->FH) |
| 159 | fatal("could not open output file \"%s\": %m" , AH->fSpec); |
| 160 | } |
| 161 | else |
| 162 | { |
| 163 | AH->FH = stdout; |
| 164 | if (!AH->FH) |
| 165 | fatal("could not open output file: %m" ); |
| 166 | } |
| 167 | |
| 168 | ctx->hasSeek = checkSeek(AH->FH); |
| 169 | } |
| 170 | else |
| 171 | { |
| 172 | if (AH->fSpec && strcmp(AH->fSpec, "" ) != 0) |
| 173 | { |
| 174 | AH->FH = fopen(AH->fSpec, PG_BINARY_R); |
| 175 | if (!AH->FH) |
| 176 | fatal("could not open input file \"%s\": %m" , AH->fSpec); |
| 177 | } |
| 178 | else |
| 179 | { |
| 180 | AH->FH = stdin; |
| 181 | if (!AH->FH) |
| 182 | fatal("could not open input file: %m" ); |
| 183 | } |
| 184 | |
| 185 | ctx->hasSeek = checkSeek(AH->FH); |
| 186 | |
| 187 | ReadHead(AH); |
| 188 | ReadToc(AH); |
| 189 | ctx->dataStart = _getFilePos(AH, ctx); |
| 190 | } |
| 191 | |
| 192 | } |
| 193 | |
| 194 | /* |
| 195 | * Called by the Archiver when the dumper creates a new TOC entry. |
| 196 | * |
| 197 | * Optional. |
| 198 | * |
| 199 | * Set up extract format-related TOC data. |
| 200 | */ |
| 201 | static void |
| 202 | _ArchiveEntry(ArchiveHandle *AH, TocEntry *te) |
| 203 | { |
| 204 | lclTocEntry *ctx; |
| 205 | |
| 206 | ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); |
| 207 | if (te->dataDumper) |
| 208 | ctx->dataState = K_OFFSET_POS_NOT_SET; |
| 209 | else |
| 210 | ctx->dataState = K_OFFSET_NO_DATA; |
| 211 | |
| 212 | te->formatData = (void *) ctx; |
| 213 | } |
| 214 | |
| 215 | /* |
| 216 | * Called by the Archiver to save any extra format-related TOC entry |
| 217 | * data. |
| 218 | * |
| 219 | * Optional. |
| 220 | * |
| 221 | * Use the Archiver routines to write data - they are non-endian, and |
| 222 | * maintain other important file information. |
| 223 | */ |
| 224 | static void |
| 225 | (ArchiveHandle *AH, TocEntry *te) |
| 226 | { |
| 227 | lclTocEntry *ctx = (lclTocEntry *) te->formatData; |
| 228 | |
| 229 | WriteOffset(AH, ctx->dataPos, ctx->dataState); |
| 230 | } |
| 231 | |
| 232 | /* |
| 233 | * Called by the Archiver to read any extra format-related TOC data. |
| 234 | * |
| 235 | * Optional. |
| 236 | * |
| 237 | * Needs to match the order defined in _WriteExtraToc, and should also |
| 238 | * use the Archiver input routines. |
| 239 | */ |
| 240 | static void |
| 241 | (ArchiveHandle *AH, TocEntry *te) |
| 242 | { |
| 243 | lclTocEntry *ctx = (lclTocEntry *) te->formatData; |
| 244 | |
| 245 | if (ctx == NULL) |
| 246 | { |
| 247 | ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); |
| 248 | te->formatData = (void *) ctx; |
| 249 | } |
| 250 | |
| 251 | ctx->dataState = ReadOffset(AH, &(ctx->dataPos)); |
| 252 | |
| 253 | /* |
| 254 | * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't |
| 255 | * dump it at all. |
| 256 | */ |
| 257 | if (AH->version < K_VERS_1_7) |
| 258 | ReadInt(AH); |
| 259 | } |
| 260 | |
| 261 | /* |
| 262 | * Called by the Archiver when restoring an archive to output a comment |
| 263 | * that includes useful information about the TOC entry. |
| 264 | * |
| 265 | * Optional. |
| 266 | * |
| 267 | */ |
| 268 | static void |
| 269 | (ArchiveHandle *AH, TocEntry *te) |
| 270 | { |
| 271 | lclTocEntry *ctx = (lclTocEntry *) te->formatData; |
| 272 | |
| 273 | if (AH->public.verbose) |
| 274 | ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n" , |
| 275 | (int64) ctx->dataPos); |
| 276 | } |
| 277 | |
| 278 | /* |
| 279 | * Called by the archiver when saving TABLE DATA (not schema). This routine |
| 280 | * should save whatever format-specific information is needed to read |
| 281 | * the archive back. |
| 282 | * |
| 283 | * It is called just prior to the dumper's 'DataDumper' routine being called. |
| 284 | * |
| 285 | * Optional, but strongly recommended. |
| 286 | * |
| 287 | */ |
| 288 | static void |
| 289 | _StartData(ArchiveHandle *AH, TocEntry *te) |
| 290 | { |
| 291 | lclContext *ctx = (lclContext *) AH->formatData; |
| 292 | lclTocEntry *tctx = (lclTocEntry *) te->formatData; |
| 293 | |
| 294 | tctx->dataPos = _getFilePos(AH, ctx); |
| 295 | tctx->dataState = K_OFFSET_POS_SET; |
| 296 | |
| 297 | _WriteByte(AH, BLK_DATA); /* Block type */ |
| 298 | WriteInt(AH, te->dumpId); /* For sanity check */ |
| 299 | |
| 300 | ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); |
| 301 | } |
| 302 | |
| 303 | /* |
| 304 | * Called by archiver when dumper calls WriteData. This routine is |
| 305 | * called for both BLOB and TABLE data; it is the responsibility of |
| 306 | * the format to manage each kind of data using StartBlob/StartData. |
| 307 | * |
| 308 | * It should only be called from within a DataDumper routine. |
| 309 | * |
| 310 | * Mandatory. |
| 311 | */ |
| 312 | static void |
| 313 | _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) |
| 314 | { |
| 315 | lclContext *ctx = (lclContext *) AH->formatData; |
| 316 | CompressorState *cs = ctx->cs; |
| 317 | |
| 318 | if (dLen > 0) |
| 319 | /* WriteDataToArchive() internally throws write errors */ |
| 320 | WriteDataToArchive(AH, cs, data, dLen); |
| 321 | |
| 322 | return; |
| 323 | } |
| 324 | |
| 325 | /* |
| 326 | * Called by the archiver when a dumper's 'DataDumper' routine has |
| 327 | * finished. |
| 328 | * |
| 329 | * Optional. |
| 330 | * |
| 331 | */ |
| 332 | static void |
| 333 | _EndData(ArchiveHandle *AH, TocEntry *te) |
| 334 | { |
| 335 | lclContext *ctx = (lclContext *) AH->formatData; |
| 336 | |
| 337 | EndCompressor(AH, ctx->cs); |
| 338 | /* Send the end marker */ |
| 339 | WriteInt(AH, 0); |
| 340 | } |
| 341 | |
| 342 | /* |
| 343 | * Called by the archiver when starting to save all BLOB DATA (not schema). |
| 344 | * This routine should save whatever format-specific information is needed |
| 345 | * to read the BLOBs back into memory. |
| 346 | * |
| 347 | * It is called just prior to the dumper's DataDumper routine. |
| 348 | * |
| 349 | * Optional, but strongly recommended. |
| 350 | */ |
| 351 | static void |
| 352 | _StartBlobs(ArchiveHandle *AH, TocEntry *te) |
| 353 | { |
| 354 | lclContext *ctx = (lclContext *) AH->formatData; |
| 355 | lclTocEntry *tctx = (lclTocEntry *) te->formatData; |
| 356 | |
| 357 | tctx->dataPos = _getFilePos(AH, ctx); |
| 358 | tctx->dataState = K_OFFSET_POS_SET; |
| 359 | |
| 360 | _WriteByte(AH, BLK_BLOBS); /* Block type */ |
| 361 | WriteInt(AH, te->dumpId); /* For sanity check */ |
| 362 | } |
| 363 | |
| 364 | /* |
| 365 | * Called by the archiver when the dumper calls StartBlob. |
| 366 | * |
| 367 | * Mandatory. |
| 368 | * |
| 369 | * Must save the passed OID for retrieval at restore-time. |
| 370 | */ |
| 371 | static void |
| 372 | _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) |
| 373 | { |
| 374 | lclContext *ctx = (lclContext *) AH->formatData; |
| 375 | |
| 376 | if (oid == 0) |
| 377 | fatal("invalid OID for large object" ); |
| 378 | |
| 379 | WriteInt(AH, oid); |
| 380 | |
| 381 | ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); |
| 382 | } |
| 383 | |
| 384 | /* |
| 385 | * Called by the archiver when the dumper calls EndBlob. |
| 386 | * |
| 387 | * Optional. |
| 388 | */ |
| 389 | static void |
| 390 | _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) |
| 391 | { |
| 392 | lclContext *ctx = (lclContext *) AH->formatData; |
| 393 | |
| 394 | EndCompressor(AH, ctx->cs); |
| 395 | /* Send the end marker */ |
| 396 | WriteInt(AH, 0); |
| 397 | } |
| 398 | |
| 399 | /* |
| 400 | * Called by the archiver when finishing saving all BLOB DATA. |
| 401 | * |
| 402 | * Optional. |
| 403 | */ |
| 404 | static void |
| 405 | _EndBlobs(ArchiveHandle *AH, TocEntry *te) |
| 406 | { |
| 407 | /* Write out a fake zero OID to mark end-of-blobs. */ |
| 408 | WriteInt(AH, 0); |
| 409 | } |
| 410 | |
| 411 | /* |
| 412 | * Print data for a given TOC entry |
| 413 | */ |
| 414 | static void |
| 415 | _PrintTocData(ArchiveHandle *AH, TocEntry *te) |
| 416 | { |
| 417 | lclContext *ctx = (lclContext *) AH->formatData; |
| 418 | lclTocEntry *tctx = (lclTocEntry *) te->formatData; |
| 419 | int blkType; |
| 420 | int id; |
| 421 | |
| 422 | if (tctx->dataState == K_OFFSET_NO_DATA) |
| 423 | return; |
| 424 | |
| 425 | if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET) |
| 426 | { |
| 427 | /* |
| 428 | * We cannot seek directly to the desired block. Instead, skip over |
| 429 | * block headers until we find the one we want. This could fail if we |
| 430 | * are asked to restore items out-of-order. |
| 431 | */ |
| 432 | _readBlockHeader(AH, &blkType, &id); |
| 433 | |
| 434 | while (blkType != EOF && id != te->dumpId) |
| 435 | { |
| 436 | switch (blkType) |
| 437 | { |
| 438 | case BLK_DATA: |
| 439 | _skipData(AH); |
| 440 | break; |
| 441 | |
| 442 | case BLK_BLOBS: |
| 443 | _skipBlobs(AH); |
| 444 | break; |
| 445 | |
| 446 | default: /* Always have a default */ |
| 447 | fatal("unrecognized data block type (%d) while searching archive" , |
| 448 | blkType); |
| 449 | break; |
| 450 | } |
| 451 | _readBlockHeader(AH, &blkType, &id); |
| 452 | } |
| 453 | } |
| 454 | else |
| 455 | { |
| 456 | /* We can just seek to the place we need to be. */ |
| 457 | if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0) |
| 458 | fatal("error during file seek: %m" ); |
| 459 | |
| 460 | _readBlockHeader(AH, &blkType, &id); |
| 461 | } |
| 462 | |
| 463 | /* Produce suitable failure message if we fell off end of file */ |
| 464 | if (blkType == EOF) |
| 465 | { |
| 466 | if (tctx->dataState == K_OFFSET_POS_NOT_SET) |
| 467 | fatal("could not find block ID %d in archive -- " |
| 468 | "possibly due to out-of-order restore request, " |
| 469 | "which cannot be handled due to lack of data offsets in archive" , |
| 470 | te->dumpId); |
| 471 | else if (!ctx->hasSeek) |
| 472 | fatal("could not find block ID %d in archive -- " |
| 473 | "possibly due to out-of-order restore request, " |
| 474 | "which cannot be handled due to non-seekable input file" , |
| 475 | te->dumpId); |
| 476 | else /* huh, the dataPos led us to EOF? */ |
| 477 | fatal("could not find block ID %d in archive -- " |
| 478 | "possibly corrupt archive" , |
| 479 | te->dumpId); |
| 480 | } |
| 481 | |
| 482 | /* Are we sane? */ |
| 483 | if (id != te->dumpId) |
| 484 | fatal("found unexpected block ID (%d) when reading data -- expected %d" , |
| 485 | id, te->dumpId); |
| 486 | |
| 487 | switch (blkType) |
| 488 | { |
| 489 | case BLK_DATA: |
| 490 | _PrintData(AH); |
| 491 | break; |
| 492 | |
| 493 | case BLK_BLOBS: |
| 494 | _LoadBlobs(AH, AH->public.ropt->dropSchema); |
| 495 | break; |
| 496 | |
| 497 | default: /* Always have a default */ |
| 498 | fatal("unrecognized data block type %d while restoring archive" , |
| 499 | blkType); |
| 500 | break; |
| 501 | } |
| 502 | } |
| 503 | |
| 504 | /* |
| 505 | * Print data from current file position. |
| 506 | */ |
| 507 | static void |
| 508 | _PrintData(ArchiveHandle *AH) |
| 509 | { |
| 510 | ReadDataFromArchive(AH, AH->compression, _CustomReadFunc); |
| 511 | } |
| 512 | |
| 513 | static void |
| 514 | _LoadBlobs(ArchiveHandle *AH, bool drop) |
| 515 | { |
| 516 | Oid oid; |
| 517 | |
| 518 | StartRestoreBlobs(AH); |
| 519 | |
| 520 | oid = ReadInt(AH); |
| 521 | while (oid != 0) |
| 522 | { |
| 523 | StartRestoreBlob(AH, oid, drop); |
| 524 | _PrintData(AH); |
| 525 | EndRestoreBlob(AH, oid); |
| 526 | oid = ReadInt(AH); |
| 527 | } |
| 528 | |
| 529 | EndRestoreBlobs(AH); |
| 530 | } |
| 531 | |
| 532 | /* |
| 533 | * Skip the BLOBs from the current file position. |
| 534 | * BLOBS are written sequentially as data blocks (see below). |
| 535 | * Each BLOB is preceded by it's original OID. |
| 536 | * A zero OID indicated the end of the BLOBS |
| 537 | */ |
| 538 | static void |
| 539 | _skipBlobs(ArchiveHandle *AH) |
| 540 | { |
| 541 | Oid oid; |
| 542 | |
| 543 | oid = ReadInt(AH); |
| 544 | while (oid != 0) |
| 545 | { |
| 546 | _skipData(AH); |
| 547 | oid = ReadInt(AH); |
| 548 | } |
| 549 | } |
| 550 | |
| 551 | /* |
| 552 | * Skip data from current file position. |
| 553 | * Data blocks are formatted as an integer length, followed by data. |
| 554 | * A zero length denoted the end of the block. |
| 555 | */ |
| 556 | static void |
| 557 | _skipData(ArchiveHandle *AH) |
| 558 | { |
| 559 | lclContext *ctx = (lclContext *) AH->formatData; |
| 560 | size_t blkLen; |
| 561 | char *buf = NULL; |
| 562 | int buflen = 0; |
| 563 | size_t cnt; |
| 564 | |
| 565 | blkLen = ReadInt(AH); |
| 566 | while (blkLen != 0) |
| 567 | { |
| 568 | if (blkLen > buflen) |
| 569 | { |
| 570 | if (buf) |
| 571 | free(buf); |
| 572 | buf = (char *) pg_malloc(blkLen); |
| 573 | buflen = blkLen; |
| 574 | } |
| 575 | if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen) |
| 576 | { |
| 577 | if (feof(AH->FH)) |
| 578 | fatal("could not read from input file: end of file" ); |
| 579 | else |
| 580 | fatal("could not read from input file: %m" ); |
| 581 | } |
| 582 | |
| 583 | ctx->filePos += blkLen; |
| 584 | |
| 585 | blkLen = ReadInt(AH); |
| 586 | } |
| 587 | |
| 588 | if (buf) |
| 589 | free(buf); |
| 590 | } |
| 591 | |
| 592 | /* |
| 593 | * Write a byte of data to the archive. |
| 594 | * |
| 595 | * Mandatory. |
| 596 | * |
| 597 | * Called by the archiver to do integer & byte output to the archive. |
| 598 | */ |
| 599 | static int |
| 600 | _WriteByte(ArchiveHandle *AH, const int i) |
| 601 | { |
| 602 | lclContext *ctx = (lclContext *) AH->formatData; |
| 603 | int res; |
| 604 | |
| 605 | if ((res = fputc(i, AH->FH)) == EOF) |
| 606 | WRITE_ERROR_EXIT; |
| 607 | ctx->filePos += 1; |
| 608 | |
| 609 | return 1; |
| 610 | } |
| 611 | |
| 612 | /* |
| 613 | * Read a byte of data from the archive. |
| 614 | * |
| 615 | * Mandatory |
| 616 | * |
| 617 | * Called by the archiver to read bytes & integers from the archive. |
| 618 | * EOF should be treated as a fatal error. |
| 619 | */ |
| 620 | static int |
| 621 | _ReadByte(ArchiveHandle *AH) |
| 622 | { |
| 623 | lclContext *ctx = (lclContext *) AH->formatData; |
| 624 | int res; |
| 625 | |
| 626 | res = getc(AH->FH); |
| 627 | if (res == EOF) |
| 628 | READ_ERROR_EXIT(AH->FH); |
| 629 | ctx->filePos += 1; |
| 630 | return res; |
| 631 | } |
| 632 | |
| 633 | /* |
| 634 | * Write a buffer of data to the archive. |
| 635 | * |
| 636 | * Mandatory. |
| 637 | * |
| 638 | * Called by the archiver to write a block of bytes to the archive. |
| 639 | */ |
| 640 | static void |
| 641 | _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len) |
| 642 | { |
| 643 | lclContext *ctx = (lclContext *) AH->formatData; |
| 644 | |
| 645 | if (fwrite(buf, 1, len, AH->FH) != len) |
| 646 | WRITE_ERROR_EXIT; |
| 647 | ctx->filePos += len; |
| 648 | |
| 649 | return; |
| 650 | } |
| 651 | |
| 652 | /* |
| 653 | * Read a block of bytes from the archive. |
| 654 | * |
| 655 | * Mandatory. |
| 656 | * |
| 657 | * Called by the archiver to read a block of bytes from the archive |
| 658 | */ |
| 659 | static void |
| 660 | _ReadBuf(ArchiveHandle *AH, void *buf, size_t len) |
| 661 | { |
| 662 | lclContext *ctx = (lclContext *) AH->formatData; |
| 663 | |
| 664 | if (fread(buf, 1, len, AH->FH) != len) |
| 665 | READ_ERROR_EXIT(AH->FH); |
| 666 | ctx->filePos += len; |
| 667 | |
| 668 | return; |
| 669 | } |
| 670 | |
| 671 | /* |
| 672 | * Close the archive. |
| 673 | * |
| 674 | * Mandatory. |
| 675 | * |
| 676 | * When writing the archive, this is the routine that actually starts |
| 677 | * the process of saving it to files. No data should be written prior |
| 678 | * to this point, since the user could sort the TOC after creating it. |
| 679 | * |
| 680 | * If an archive is to be written, this routine must call: |
| 681 | * WriteHead to save the archive header |
| 682 | * WriteToc to save the TOC entries |
| 683 | * WriteDataChunks to save all DATA & BLOBs. |
| 684 | * |
| 685 | */ |
| 686 | static void |
| 687 | _CloseArchive(ArchiveHandle *AH) |
| 688 | { |
| 689 | lclContext *ctx = (lclContext *) AH->formatData; |
| 690 | pgoff_t tpos; |
| 691 | |
| 692 | if (AH->mode == archModeWrite) |
| 693 | { |
| 694 | WriteHead(AH); |
| 695 | /* Remember TOC's seek position for use below */ |
| 696 | tpos = ftello(AH->FH); |
| 697 | if (tpos < 0 && ctx->hasSeek) |
| 698 | fatal("could not determine seek position in archive file: %m" ); |
| 699 | WriteToc(AH); |
| 700 | ctx->dataStart = _getFilePos(AH, ctx); |
| 701 | WriteDataChunks(AH, NULL); |
| 702 | |
| 703 | /* |
| 704 | * If possible, re-write the TOC in order to update the data offset |
| 705 | * information. This is not essential, as pg_restore can cope in most |
| 706 | * cases without it; but it can make pg_restore significantly faster |
| 707 | * in some situations (especially parallel restore). |
| 708 | */ |
| 709 | if (ctx->hasSeek && |
| 710 | fseeko(AH->FH, tpos, SEEK_SET) == 0) |
| 711 | WriteToc(AH); |
| 712 | } |
| 713 | |
| 714 | if (fclose(AH->FH) != 0) |
| 715 | fatal("could not close archive file: %m" ); |
| 716 | |
| 717 | /* Sync the output file if one is defined */ |
| 718 | if (AH->dosync && AH->mode == archModeWrite && AH->fSpec) |
| 719 | (void) fsync_fname(AH->fSpec, false); |
| 720 | |
| 721 | AH->FH = NULL; |
| 722 | } |
| 723 | |
| 724 | /* |
| 725 | * Reopen the archive's file handle. |
| 726 | * |
| 727 | * We close the original file handle, except on Windows. (The difference |
| 728 | * is because on Windows, this is used within a multithreading context, |
| 729 | * and we don't want a thread closing the parent file handle.) |
| 730 | */ |
| 731 | static void |
| 732 | _ReopenArchive(ArchiveHandle *AH) |
| 733 | { |
| 734 | lclContext *ctx = (lclContext *) AH->formatData; |
| 735 | pgoff_t tpos; |
| 736 | |
| 737 | if (AH->mode == archModeWrite) |
| 738 | fatal("can only reopen input archives" ); |
| 739 | |
| 740 | /* |
| 741 | * These two cases are user-facing errors since they represent unsupported |
| 742 | * (but not invalid) use-cases. Word the error messages appropriately. |
| 743 | */ |
| 744 | if (AH->fSpec == NULL || strcmp(AH->fSpec, "" ) == 0) |
| 745 | fatal("parallel restore from standard input is not supported" ); |
| 746 | if (!ctx->hasSeek) |
| 747 | fatal("parallel restore from non-seekable file is not supported" ); |
| 748 | |
| 749 | tpos = ftello(AH->FH); |
| 750 | if (tpos < 0) |
| 751 | fatal("could not determine seek position in archive file: %m" ); |
| 752 | |
| 753 | #ifndef WIN32 |
| 754 | if (fclose(AH->FH) != 0) |
| 755 | fatal("could not close archive file: %m" ); |
| 756 | #endif |
| 757 | |
| 758 | AH->FH = fopen(AH->fSpec, PG_BINARY_R); |
| 759 | if (!AH->FH) |
| 760 | fatal("could not open input file \"%s\": %m" , AH->fSpec); |
| 761 | |
| 762 | if (fseeko(AH->FH, tpos, SEEK_SET) != 0) |
| 763 | fatal("could not set seek position in archive file: %m" ); |
| 764 | } |
| 765 | |
| 766 | /* |
| 767 | * Prepare for parallel restore. |
| 768 | * |
| 769 | * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS |
| 770 | * TOC entries' dataLength fields with appropriate values to guide the |
| 771 | * ordering of restore jobs. The source of said data is format-dependent, |
| 772 | * as is the exact meaning of the values. |
| 773 | * |
| 774 | * A format module might also choose to do other setup here. |
| 775 | */ |
| 776 | static void |
| 777 | _PrepParallelRestore(ArchiveHandle *AH) |
| 778 | { |
| 779 | lclContext *ctx = (lclContext *) AH->formatData; |
| 780 | TocEntry *prev_te = NULL; |
| 781 | lclTocEntry *prev_tctx = NULL; |
| 782 | TocEntry *te; |
| 783 | |
| 784 | /* |
| 785 | * Knowing that the data items were dumped out in TOC order, we can |
| 786 | * reconstruct the length of each item as the delta to the start offset of |
| 787 | * the next data item. |
| 788 | */ |
| 789 | for (te = AH->toc->next; te != AH->toc; te = te->next) |
| 790 | { |
| 791 | lclTocEntry *tctx = (lclTocEntry *) te->formatData; |
| 792 | |
| 793 | /* |
| 794 | * Ignore entries without a known data offset; if we were unable to |
| 795 | * seek to rewrite the TOC when creating the archive, this'll be all |
| 796 | * of them, and we'll end up with no size estimates. |
| 797 | */ |
| 798 | if (tctx->dataState != K_OFFSET_POS_SET) |
| 799 | continue; |
| 800 | |
| 801 | /* Compute previous data item's length */ |
| 802 | if (prev_te) |
| 803 | { |
| 804 | if (tctx->dataPos > prev_tctx->dataPos) |
| 805 | prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos; |
| 806 | } |
| 807 | |
| 808 | prev_te = te; |
| 809 | prev_tctx = tctx; |
| 810 | } |
| 811 | |
| 812 | /* If OK to seek, we can determine the length of the last item */ |
| 813 | if (prev_te && ctx->hasSeek) |
| 814 | { |
| 815 | pgoff_t endpos; |
| 816 | |
| 817 | if (fseeko(AH->FH, 0, SEEK_END) != 0) |
| 818 | fatal("error during file seek: %m" ); |
| 819 | endpos = ftello(AH->FH); |
| 820 | if (endpos > prev_tctx->dataPos) |
| 821 | prev_te->dataLength = endpos - prev_tctx->dataPos; |
| 822 | } |
| 823 | } |
| 824 | |
| 825 | /* |
| 826 | * Clone format-specific fields during parallel restoration. |
| 827 | */ |
| 828 | static void |
| 829 | _Clone(ArchiveHandle *AH) |
| 830 | { |
| 831 | lclContext *ctx = (lclContext *) AH->formatData; |
| 832 | |
| 833 | AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext)); |
| 834 | memcpy(AH->formatData, ctx, sizeof(lclContext)); |
| 835 | ctx = (lclContext *) AH->formatData; |
| 836 | |
| 837 | /* sanity check, shouldn't happen */ |
| 838 | if (ctx->cs != NULL) |
| 839 | fatal("compressor active" ); |
| 840 | |
| 841 | /* |
| 842 | * Note: we do not make a local lo_buf because we expect at most one BLOBS |
| 843 | * entry per archive, so no parallelism is possible. Likewise, |
| 844 | * TOC-entry-local state isn't an issue because any one TOC entry is |
| 845 | * touched by just one worker child. |
| 846 | */ |
| 847 | } |
| 848 | |
| 849 | static void |
| 850 | _DeClone(ArchiveHandle *AH) |
| 851 | { |
| 852 | lclContext *ctx = (lclContext *) AH->formatData; |
| 853 | |
| 854 | free(ctx); |
| 855 | } |
| 856 | |
| 857 | /* |
| 858 | * This function is executed in the child of a parallel restore from a |
| 859 | * custom-format archive and restores the actual data for one TOC entry. |
| 860 | */ |
| 861 | static int |
| 862 | _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) |
| 863 | { |
| 864 | return parallel_restore(AH, te); |
| 865 | } |
| 866 | |
| 867 | /*-------------------------------------------------- |
| 868 | * END OF FORMAT CALLBACKS |
| 869 | *-------------------------------------------------- |
| 870 | */ |
| 871 | |
| 872 | /* |
| 873 | * Get the current position in the archive file. |
| 874 | */ |
| 875 | static pgoff_t |
| 876 | _getFilePos(ArchiveHandle *AH, lclContext *ctx) |
| 877 | { |
| 878 | pgoff_t pos; |
| 879 | |
| 880 | if (ctx->hasSeek) |
| 881 | { |
| 882 | /* |
| 883 | * Prior to 1.7 (pg7.3) we relied on the internally maintained |
| 884 | * pointer. Now we rely on ftello() always, unless the file has been |
| 885 | * found to not support it. For debugging purposes, print a warning |
| 886 | * if the internal pointer disagrees, so that we're more likely to |
| 887 | * notice if something's broken about the internal position tracking. |
| 888 | */ |
| 889 | pos = ftello(AH->FH); |
| 890 | if (pos < 0) |
| 891 | fatal("could not determine seek position in archive file: %m" ); |
| 892 | |
| 893 | if (pos != ctx->filePos) |
| 894 | pg_log_warning("ftell mismatch with expected position -- ftell used" ); |
| 895 | } |
| 896 | else |
| 897 | pos = ctx->filePos; |
| 898 | return pos; |
| 899 | } |
| 900 | |
| 901 | /* |
| 902 | * Read a data block header. The format changed in V1.3, so we |
| 903 | * centralize the code here for simplicity. Returns *type = EOF |
| 904 | * if at EOF. |
| 905 | */ |
| 906 | static void |
| 907 | (ArchiveHandle *AH, int *type, int *id) |
| 908 | { |
| 909 | lclContext *ctx = (lclContext *) AH->formatData; |
| 910 | int byt; |
| 911 | |
| 912 | /* |
| 913 | * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside |
| 914 | * ReadInt rather than returning EOF. It doesn't seem worth jumping |
| 915 | * through hoops to deal with that case better, because no such files are |
| 916 | * likely to exist in the wild: only some 7.1 development versions of |
| 917 | * pg_dump ever generated such files. |
| 918 | */ |
| 919 | if (AH->version < K_VERS_1_3) |
| 920 | *type = BLK_DATA; |
| 921 | else |
| 922 | { |
| 923 | byt = getc(AH->FH); |
| 924 | *type = byt; |
| 925 | if (byt == EOF) |
| 926 | { |
| 927 | *id = 0; /* don't return an uninitialized value */ |
| 928 | return; |
| 929 | } |
| 930 | ctx->filePos += 1; |
| 931 | } |
| 932 | |
| 933 | *id = ReadInt(AH); |
| 934 | } |
| 935 | |
| 936 | /* |
| 937 | * Callback function for WriteDataToArchive. Writes one block of (compressed) |
| 938 | * data to the archive. |
| 939 | */ |
| 940 | static void |
| 941 | _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len) |
| 942 | { |
| 943 | /* never write 0-byte blocks (this should not happen) */ |
| 944 | if (len > 0) |
| 945 | { |
| 946 | WriteInt(AH, len); |
| 947 | _WriteBuf(AH, buf, len); |
| 948 | } |
| 949 | return; |
| 950 | } |
| 951 | |
| 952 | /* |
| 953 | * Callback function for ReadDataFromArchive. To keep things simple, we |
| 954 | * always read one compressed block at a time. |
| 955 | */ |
| 956 | static size_t |
| 957 | _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen) |
| 958 | { |
| 959 | size_t blkLen; |
| 960 | |
| 961 | /* Read length */ |
| 962 | blkLen = ReadInt(AH); |
| 963 | if (blkLen == 0) |
| 964 | return 0; |
| 965 | |
| 966 | /* If the caller's buffer is not large enough, allocate a bigger one */ |
| 967 | if (blkLen > *buflen) |
| 968 | { |
| 969 | free(*buf); |
| 970 | *buf = (char *) pg_malloc(blkLen); |
| 971 | *buflen = blkLen; |
| 972 | } |
| 973 | |
| 974 | /* exits app on read errors */ |
| 975 | _ReadBuf(AH, *buf, blkLen); |
| 976 | |
| 977 | return blkLen; |
| 978 | } |
| 979 | |