1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * walmethods.c - implementations of different ways to write received wal |
4 | * |
5 | * NOTE! The caller must ensure that only one method is instantiated in |
6 | * any given program, and that it's only instantiated once! |
7 | * |
8 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
9 | * |
10 | * IDENTIFICATION |
11 | * src/bin/pg_basebackup/walmethods.c |
12 | *------------------------------------------------------------------------- |
13 | */ |
14 | |
15 | #include "postgres_fe.h" |
16 | |
17 | #include <sys/stat.h> |
18 | #include <time.h> |
19 | #include <unistd.h> |
20 | #ifdef HAVE_LIBZ |
21 | #include <zlib.h> |
22 | #endif |
23 | |
24 | #include "pgtar.h" |
25 | #include "common/file_perm.h" |
26 | #include "common/file_utils.h" |
27 | |
28 | #include "receivelog.h" |
29 | #include "streamutil.h" |
30 | |
31 | /* Size of zlib buffer for .tar.gz */ |
32 | #define ZLIB_OUT_SIZE 4096 |
33 | |
34 | /*------------------------------------------------------------------------- |
35 | * WalDirectoryMethod - write wal to a directory looking like pg_wal |
36 | *------------------------------------------------------------------------- |
37 | */ |
38 | |
39 | /* |
40 | * Global static data for this method |
41 | */ |
42 | typedef struct DirectoryMethodData |
43 | { |
44 | char *basedir; |
45 | int compression; |
46 | bool sync; |
47 | } DirectoryMethodData; |
48 | static DirectoryMethodData *dir_data = NULL; |
49 | |
50 | /* |
51 | * Local file handle |
52 | */ |
53 | typedef struct DirectoryMethodFile |
54 | { |
55 | int fd; |
56 | off_t currpos; |
57 | char *pathname; |
58 | char *fullpath; |
59 | char *temp_suffix; |
60 | #ifdef HAVE_LIBZ |
61 | gzFile gzfp; |
62 | #endif |
63 | } DirectoryMethodFile; |
64 | |
65 | static const char * |
66 | dir_getlasterror(void) |
67 | { |
68 | /* Directory method always sets errno, so just use strerror */ |
69 | return strerror(errno); |
70 | } |
71 | |
72 | static Walfile |
73 | dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) |
74 | { |
75 | static char tmppath[MAXPGPATH]; |
76 | int fd; |
77 | DirectoryMethodFile *f; |
78 | #ifdef HAVE_LIBZ |
79 | gzFile gzfp = NULL; |
80 | #endif |
81 | |
82 | snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s" , |
83 | dir_data->basedir, pathname, |
84 | dir_data->compression > 0 ? ".gz" : "" , |
85 | temp_suffix ? temp_suffix : "" ); |
86 | |
87 | /* |
88 | * Open a file for non-compressed as well as compressed files. Tracking |
89 | * the file descriptor is important for dir_sync() method as gzflush() |
90 | * does not do any system calls to fsync() to make changes permanent on |
91 | * disk. |
92 | */ |
93 | fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode); |
94 | if (fd < 0) |
95 | return NULL; |
96 | |
97 | #ifdef HAVE_LIBZ |
98 | if (dir_data->compression > 0) |
99 | { |
100 | gzfp = gzdopen(fd, "wb" ); |
101 | if (gzfp == NULL) |
102 | { |
103 | close(fd); |
104 | return NULL; |
105 | } |
106 | |
107 | if (gzsetparams(gzfp, dir_data->compression, |
108 | Z_DEFAULT_STRATEGY) != Z_OK) |
109 | { |
110 | gzclose(gzfp); |
111 | return NULL; |
112 | } |
113 | } |
114 | #endif |
115 | |
116 | /* Do pre-padding on non-compressed files */ |
117 | if (pad_to_size && dir_data->compression == 0) |
118 | { |
119 | PGAlignedXLogBlock zerobuf; |
120 | int bytes; |
121 | |
122 | memset(zerobuf.data, 0, XLOG_BLCKSZ); |
123 | for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ) |
124 | { |
125 | errno = 0; |
126 | if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ) |
127 | { |
128 | int save_errno = errno; |
129 | |
130 | close(fd); |
131 | |
132 | /* |
133 | * If write didn't set errno, assume problem is no disk space. |
134 | */ |
135 | errno = save_errno ? save_errno : ENOSPC; |
136 | return NULL; |
137 | } |
138 | } |
139 | |
140 | if (lseek(fd, 0, SEEK_SET) != 0) |
141 | { |
142 | int save_errno = errno; |
143 | |
144 | close(fd); |
145 | errno = save_errno; |
146 | return NULL; |
147 | } |
148 | } |
149 | |
150 | /* |
151 | * fsync WAL file and containing directory, to ensure the file is |
152 | * persistently created and zeroed (if padded). That's particularly |
153 | * important when using synchronous mode, where the file is modified and |
154 | * fsynced in-place, without a directory fsync. |
155 | */ |
156 | if (dir_data->sync) |
157 | { |
158 | if (fsync_fname(tmppath, false) != 0 || |
159 | fsync_parent_path(tmppath) != 0) |
160 | { |
161 | #ifdef HAVE_LIBZ |
162 | if (dir_data->compression > 0) |
163 | gzclose(gzfp); |
164 | else |
165 | #endif |
166 | close(fd); |
167 | return NULL; |
168 | } |
169 | } |
170 | |
171 | f = pg_malloc0(sizeof(DirectoryMethodFile)); |
172 | #ifdef HAVE_LIBZ |
173 | if (dir_data->compression > 0) |
174 | f->gzfp = gzfp; |
175 | #endif |
176 | f->fd = fd; |
177 | f->currpos = 0; |
178 | f->pathname = pg_strdup(pathname); |
179 | f->fullpath = pg_strdup(tmppath); |
180 | if (temp_suffix) |
181 | f->temp_suffix = pg_strdup(temp_suffix); |
182 | |
183 | return f; |
184 | } |
185 | |
186 | static ssize_t |
187 | dir_write(Walfile f, const void *buf, size_t count) |
188 | { |
189 | ssize_t r; |
190 | DirectoryMethodFile *df = (DirectoryMethodFile *) f; |
191 | |
192 | Assert(f != NULL); |
193 | |
194 | #ifdef HAVE_LIBZ |
195 | if (dir_data->compression > 0) |
196 | r = (ssize_t) gzwrite(df->gzfp, buf, count); |
197 | else |
198 | #endif |
199 | r = write(df->fd, buf, count); |
200 | if (r > 0) |
201 | df->currpos += r; |
202 | return r; |
203 | } |
204 | |
205 | static off_t |
206 | dir_get_current_pos(Walfile f) |
207 | { |
208 | Assert(f != NULL); |
209 | |
210 | /* Use a cached value to prevent lots of reseeks */ |
211 | return ((DirectoryMethodFile *) f)->currpos; |
212 | } |
213 | |
214 | static int |
215 | dir_close(Walfile f, WalCloseMethod method) |
216 | { |
217 | int r; |
218 | DirectoryMethodFile *df = (DirectoryMethodFile *) f; |
219 | static char tmppath[MAXPGPATH]; |
220 | static char tmppath2[MAXPGPATH]; |
221 | |
222 | Assert(f != NULL); |
223 | |
224 | #ifdef HAVE_LIBZ |
225 | if (dir_data->compression > 0) |
226 | r = gzclose(df->gzfp); |
227 | else |
228 | #endif |
229 | r = close(df->fd); |
230 | |
231 | if (r == 0) |
232 | { |
233 | /* Build path to the current version of the file */ |
234 | if (method == CLOSE_NORMAL && df->temp_suffix) |
235 | { |
236 | /* |
237 | * If we have a temp prefix, normal operation is to rename the |
238 | * file. |
239 | */ |
240 | snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s" , |
241 | dir_data->basedir, df->pathname, |
242 | dir_data->compression > 0 ? ".gz" : "" , |
243 | df->temp_suffix); |
244 | snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s" , |
245 | dir_data->basedir, df->pathname, |
246 | dir_data->compression > 0 ? ".gz" : "" ); |
247 | r = durable_rename(tmppath, tmppath2); |
248 | } |
249 | else if (method == CLOSE_UNLINK) |
250 | { |
251 | /* Unlink the file once it's closed */ |
252 | snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s" , |
253 | dir_data->basedir, df->pathname, |
254 | dir_data->compression > 0 ? ".gz" : "" , |
255 | df->temp_suffix ? df->temp_suffix : "" ); |
256 | r = unlink(tmppath); |
257 | } |
258 | else |
259 | { |
260 | /* |
261 | * Else either CLOSE_NORMAL and no temp suffix, or |
262 | * CLOSE_NO_RENAME. In this case, fsync the file and containing |
263 | * directory if sync mode is requested. |
264 | */ |
265 | if (dir_data->sync) |
266 | { |
267 | r = fsync_fname(df->fullpath, false); |
268 | if (r == 0) |
269 | r = fsync_parent_path(df->fullpath); |
270 | } |
271 | } |
272 | } |
273 | |
274 | pg_free(df->pathname); |
275 | pg_free(df->fullpath); |
276 | if (df->temp_suffix) |
277 | pg_free(df->temp_suffix); |
278 | pg_free(df); |
279 | |
280 | return r; |
281 | } |
282 | |
283 | static int |
284 | dir_sync(Walfile f) |
285 | { |
286 | Assert(f != NULL); |
287 | |
288 | if (!dir_data->sync) |
289 | return 0; |
290 | |
291 | #ifdef HAVE_LIBZ |
292 | if (dir_data->compression > 0) |
293 | { |
294 | if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK) |
295 | return -1; |
296 | } |
297 | #endif |
298 | |
299 | return fsync(((DirectoryMethodFile *) f)->fd); |
300 | } |
301 | |
302 | static ssize_t |
303 | dir_get_file_size(const char *pathname) |
304 | { |
305 | struct stat statbuf; |
306 | static char tmppath[MAXPGPATH]; |
307 | |
308 | snprintf(tmppath, sizeof(tmppath), "%s/%s" , |
309 | dir_data->basedir, pathname); |
310 | |
311 | if (stat(tmppath, &statbuf) != 0) |
312 | return -1; |
313 | |
314 | return statbuf.st_size; |
315 | } |
316 | |
317 | static bool |
318 | dir_existsfile(const char *pathname) |
319 | { |
320 | static char tmppath[MAXPGPATH]; |
321 | int fd; |
322 | |
323 | snprintf(tmppath, sizeof(tmppath), "%s/%s" , |
324 | dir_data->basedir, pathname); |
325 | |
326 | fd = open(tmppath, O_RDONLY | PG_BINARY, 0); |
327 | if (fd < 0) |
328 | return false; |
329 | close(fd); |
330 | return true; |
331 | } |
332 | |
333 | static bool |
334 | dir_finish(void) |
335 | { |
336 | if (dir_data->sync) |
337 | { |
338 | /* |
339 | * Files are fsynced when they are closed, but we need to fsync the |
340 | * directory entry here as well. |
341 | */ |
342 | if (fsync_fname(dir_data->basedir, true) != 0) |
343 | return false; |
344 | } |
345 | return true; |
346 | } |
347 | |
348 | |
349 | WalWriteMethod * |
350 | CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) |
351 | { |
352 | WalWriteMethod *method; |
353 | |
354 | method = pg_malloc0(sizeof(WalWriteMethod)); |
355 | method->open_for_write = dir_open_for_write; |
356 | method->write = dir_write; |
357 | method->get_current_pos = dir_get_current_pos; |
358 | method->get_file_size = dir_get_file_size; |
359 | method->close = dir_close; |
360 | method->sync = dir_sync; |
361 | method->existsfile = dir_existsfile; |
362 | method->finish = dir_finish; |
363 | method->getlasterror = dir_getlasterror; |
364 | |
365 | dir_data = pg_malloc0(sizeof(DirectoryMethodData)); |
366 | dir_data->compression = compression; |
367 | dir_data->basedir = pg_strdup(basedir); |
368 | dir_data->sync = sync; |
369 | |
370 | return method; |
371 | } |
372 | |
373 | void |
374 | FreeWalDirectoryMethod(void) |
375 | { |
376 | pg_free(dir_data->basedir); |
377 | pg_free(dir_data); |
378 | } |
379 | |
380 | |
381 | /*------------------------------------------------------------------------- |
382 | * WalTarMethod - write wal to a tar file containing pg_wal contents |
383 | *------------------------------------------------------------------------- |
384 | */ |
385 | |
386 | typedef struct TarMethodFile |
387 | { |
388 | off_t ofs_start; /* Where does the *header* for this file start */ |
389 | off_t currpos; |
390 | char [512]; |
391 | char *pathname; |
392 | size_t pad_to_size; |
393 | } TarMethodFile; |
394 | |
395 | typedef struct TarMethodData |
396 | { |
397 | char *tarfilename; |
398 | int fd; |
399 | int compression; |
400 | bool sync; |
401 | TarMethodFile *currentfile; |
402 | char lasterror[1024]; |
403 | #ifdef HAVE_LIBZ |
404 | z_streamp zp; |
405 | void *zlibOut; |
406 | #endif |
407 | } TarMethodData; |
408 | static TarMethodData *tar_data = NULL; |
409 | |
410 | #define tar_clear_error() tar_data->lasterror[0] = '\0' |
411 | #define tar_set_error(msg) strlcpy(tar_data->lasterror, _(msg), sizeof(tar_data->lasterror)) |
412 | |
413 | static const char * |
414 | tar_getlasterror(void) |
415 | { |
416 | /* |
417 | * If a custom error is set, return that one. Otherwise, assume errno is |
418 | * set and return that one. |
419 | */ |
420 | if (tar_data->lasterror[0]) |
421 | return tar_data->lasterror; |
422 | return strerror(errno); |
423 | } |
424 | |
425 | #ifdef HAVE_LIBZ |
426 | static bool |
427 | tar_write_compressed_data(void *buf, size_t count, bool flush) |
428 | { |
429 | tar_data->zp->next_in = buf; |
430 | tar_data->zp->avail_in = count; |
431 | |
432 | while (tar_data->zp->avail_in || flush) |
433 | { |
434 | int r; |
435 | |
436 | r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH); |
437 | if (r == Z_STREAM_ERROR) |
438 | { |
439 | tar_set_error("could not compress data" ); |
440 | return false; |
441 | } |
442 | |
443 | if (tar_data->zp->avail_out < ZLIB_OUT_SIZE) |
444 | { |
445 | size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out; |
446 | |
447 | errno = 0; |
448 | if (write(tar_data->fd, tar_data->zlibOut, len) != len) |
449 | { |
450 | /* |
451 | * If write didn't set errno, assume problem is no disk space. |
452 | */ |
453 | if (errno == 0) |
454 | errno = ENOSPC; |
455 | return false; |
456 | } |
457 | |
458 | tar_data->zp->next_out = tar_data->zlibOut; |
459 | tar_data->zp->avail_out = ZLIB_OUT_SIZE; |
460 | } |
461 | |
462 | if (r == Z_STREAM_END) |
463 | break; |
464 | } |
465 | |
466 | if (flush) |
467 | { |
468 | /* Reset the stream for writing */ |
469 | if (deflateReset(tar_data->zp) != Z_OK) |
470 | { |
471 | tar_set_error("could not reset compression stream" ); |
472 | return false; |
473 | } |
474 | } |
475 | |
476 | return true; |
477 | } |
478 | #endif |
479 | |
480 | static ssize_t |
481 | tar_write(Walfile f, const void *buf, size_t count) |
482 | { |
483 | ssize_t r; |
484 | |
485 | Assert(f != NULL); |
486 | tar_clear_error(); |
487 | |
488 | /* Tarfile will always be positioned at the end */ |
489 | if (!tar_data->compression) |
490 | { |
491 | r = write(tar_data->fd, buf, count); |
492 | if (r > 0) |
493 | ((TarMethodFile *) f)->currpos += r; |
494 | return r; |
495 | } |
496 | #ifdef HAVE_LIBZ |
497 | else |
498 | { |
499 | if (!tar_write_compressed_data(unconstify(void *, buf), count, false)) |
500 | return -1; |
501 | ((TarMethodFile *) f)->currpos += count; |
502 | return count; |
503 | } |
504 | #else |
505 | else |
506 | /* Can't happen - compression enabled with no libz */ |
507 | return -1; |
508 | #endif |
509 | } |
510 | |
511 | static bool |
512 | tar_write_padding_data(TarMethodFile *f, size_t bytes) |
513 | { |
514 | PGAlignedXLogBlock zerobuf; |
515 | size_t bytesleft = bytes; |
516 | |
517 | memset(zerobuf.data, 0, XLOG_BLCKSZ); |
518 | while (bytesleft) |
519 | { |
520 | size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ); |
521 | ssize_t r = tar_write(f, zerobuf.data, bytestowrite); |
522 | |
523 | if (r < 0) |
524 | return false; |
525 | bytesleft -= r; |
526 | } |
527 | |
528 | return true; |
529 | } |
530 | |
531 | static Walfile |
532 | tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) |
533 | { |
534 | int save_errno; |
535 | static char tmppath[MAXPGPATH]; |
536 | |
537 | tar_clear_error(); |
538 | |
539 | if (tar_data->fd < 0) |
540 | { |
541 | /* |
542 | * We open the tar file only when we first try to write to it. |
543 | */ |
544 | tar_data->fd = open(tar_data->tarfilename, |
545 | O_WRONLY | O_CREAT | PG_BINARY, |
546 | pg_file_create_mode); |
547 | if (tar_data->fd < 0) |
548 | return NULL; |
549 | |
550 | #ifdef HAVE_LIBZ |
551 | if (tar_data->compression) |
552 | { |
553 | tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream)); |
554 | tar_data->zp->zalloc = Z_NULL; |
555 | tar_data->zp->zfree = Z_NULL; |
556 | tar_data->zp->opaque = Z_NULL; |
557 | tar_data->zp->next_out = tar_data->zlibOut; |
558 | tar_data->zp->avail_out = ZLIB_OUT_SIZE; |
559 | |
560 | /* |
561 | * Initialize deflation library. Adding the magic value 16 to the |
562 | * default 15 for the windowBits parameter makes the output be |
563 | * gzip instead of zlib. |
564 | */ |
565 | if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) |
566 | { |
567 | pg_free(tar_data->zp); |
568 | tar_data->zp = NULL; |
569 | tar_set_error("could not initialize compression library" ); |
570 | return NULL; |
571 | } |
572 | } |
573 | #endif |
574 | |
575 | /* There's no tar header itself, the file starts with regular files */ |
576 | } |
577 | |
578 | Assert(tar_data->currentfile == NULL); |
579 | if (tar_data->currentfile != NULL) |
580 | { |
581 | tar_set_error("implementation error: tar files can't have more than one open file" ); |
582 | return NULL; |
583 | } |
584 | |
585 | tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile)); |
586 | |
587 | snprintf(tmppath, sizeof(tmppath), "%s%s" , |
588 | pathname, temp_suffix ? temp_suffix : "" ); |
589 | |
590 | /* Create a header with size set to 0 - we will fill out the size on close */ |
591 | if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK) |
592 | { |
593 | pg_free(tar_data->currentfile); |
594 | tar_data->currentfile = NULL; |
595 | tar_set_error("could not create tar header" ); |
596 | return NULL; |
597 | } |
598 | |
599 | #ifdef HAVE_LIBZ |
600 | if (tar_data->compression) |
601 | { |
602 | /* Flush existing data */ |
603 | if (!tar_write_compressed_data(NULL, 0, true)) |
604 | return NULL; |
605 | |
606 | /* Turn off compression for header */ |
607 | if (deflateParams(tar_data->zp, 0, 0) != Z_OK) |
608 | { |
609 | tar_set_error("could not change compression parameters" ); |
610 | return NULL; |
611 | } |
612 | } |
613 | #endif |
614 | |
615 | tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR); |
616 | if (tar_data->currentfile->ofs_start == -1) |
617 | { |
618 | save_errno = errno; |
619 | pg_free(tar_data->currentfile); |
620 | tar_data->currentfile = NULL; |
621 | errno = save_errno; |
622 | return NULL; |
623 | } |
624 | tar_data->currentfile->currpos = 0; |
625 | |
626 | if (!tar_data->compression) |
627 | { |
628 | errno = 0; |
629 | if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512) |
630 | { |
631 | save_errno = errno; |
632 | pg_free(tar_data->currentfile); |
633 | tar_data->currentfile = NULL; |
634 | /* if write didn't set errno, assume problem is no disk space */ |
635 | errno = save_errno ? save_errno : ENOSPC; |
636 | return NULL; |
637 | } |
638 | } |
639 | #ifdef HAVE_LIBZ |
640 | else |
641 | { |
642 | /* Write header through the zlib APIs but with no compression */ |
643 | if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true)) |
644 | return NULL; |
645 | |
646 | /* Re-enable compression for the rest of the file */ |
647 | if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK) |
648 | { |
649 | tar_set_error("could not change compression parameters" ); |
650 | return NULL; |
651 | } |
652 | } |
653 | #endif |
654 | |
655 | tar_data->currentfile->pathname = pg_strdup(pathname); |
656 | |
657 | /* |
658 | * Uncompressed files are padded on creation, but for compression we can't |
659 | * do that |
660 | */ |
661 | if (pad_to_size) |
662 | { |
663 | tar_data->currentfile->pad_to_size = pad_to_size; |
664 | if (!tar_data->compression) |
665 | { |
666 | /* Uncompressed, so pad now */ |
667 | tar_write_padding_data(tar_data->currentfile, pad_to_size); |
668 | /* Seek back to start */ |
669 | if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512) |
670 | return NULL; |
671 | |
672 | tar_data->currentfile->currpos = 0; |
673 | } |
674 | } |
675 | |
676 | return tar_data->currentfile; |
677 | } |
678 | |
679 | static ssize_t |
680 | tar_get_file_size(const char *pathname) |
681 | { |
682 | tar_clear_error(); |
683 | |
684 | /* Currently not used, so not supported */ |
685 | errno = ENOSYS; |
686 | return -1; |
687 | } |
688 | |
689 | static off_t |
690 | tar_get_current_pos(Walfile f) |
691 | { |
692 | Assert(f != NULL); |
693 | tar_clear_error(); |
694 | |
695 | return ((TarMethodFile *) f)->currpos; |
696 | } |
697 | |
698 | static int |
699 | tar_sync(Walfile f) |
700 | { |
701 | Assert(f != NULL); |
702 | tar_clear_error(); |
703 | |
704 | if (!tar_data->sync) |
705 | return 0; |
706 | |
707 | /* |
708 | * Always sync the whole tarfile, because that's all we can do. This makes |
709 | * no sense on compressed files, so just ignore those. |
710 | */ |
711 | if (tar_data->compression) |
712 | return 0; |
713 | |
714 | return fsync(tar_data->fd); |
715 | } |
716 | |
717 | static int |
718 | tar_close(Walfile f, WalCloseMethod method) |
719 | { |
720 | ssize_t filesize; |
721 | int padding; |
722 | TarMethodFile *tf = (TarMethodFile *) f; |
723 | |
724 | Assert(f != NULL); |
725 | tar_clear_error(); |
726 | |
727 | if (method == CLOSE_UNLINK) |
728 | { |
729 | if (tar_data->compression) |
730 | { |
731 | tar_set_error("unlink not supported with compression" ); |
732 | return -1; |
733 | } |
734 | |
735 | /* |
736 | * Unlink the file that we just wrote to the tar. We do this by |
737 | * truncating it to the start of the header. This is safe as we only |
738 | * allow writing of the very last file. |
739 | */ |
740 | if (ftruncate(tar_data->fd, tf->ofs_start) != 0) |
741 | return -1; |
742 | |
743 | pg_free(tf->pathname); |
744 | pg_free(tf); |
745 | tar_data->currentfile = NULL; |
746 | |
747 | return 0; |
748 | } |
749 | |
750 | /* |
751 | * Pad the file itself with zeroes if necessary. Note that this is |
752 | * different from the tar format padding -- this is the padding we asked |
753 | * for when the file was opened. |
754 | */ |
755 | if (tf->pad_to_size) |
756 | { |
757 | if (tar_data->compression) |
758 | { |
759 | /* |
760 | * A compressed tarfile is padded on close since we cannot know |
761 | * the size of the compressed output until the end. |
762 | */ |
763 | size_t sizeleft = tf->pad_to_size - tf->currpos; |
764 | |
765 | if (sizeleft) |
766 | { |
767 | if (!tar_write_padding_data(tf, sizeleft)) |
768 | return -1; |
769 | } |
770 | } |
771 | else |
772 | { |
773 | /* |
774 | * An uncompressed tarfile was padded on creation, so just adjust |
775 | * the current position as if we seeked to the end. |
776 | */ |
777 | tf->currpos = tf->pad_to_size; |
778 | } |
779 | } |
780 | |
781 | /* |
782 | * Get the size of the file, and pad the current data up to the nearest |
783 | * 512 byte boundary. |
784 | */ |
785 | filesize = tar_get_current_pos(f); |
786 | padding = ((filesize + 511) & ~511) - filesize; |
787 | if (padding) |
788 | { |
789 | char zerobuf[512]; |
790 | |
791 | MemSet(zerobuf, 0, padding); |
792 | if (tar_write(f, zerobuf, padding) != padding) |
793 | return -1; |
794 | } |
795 | |
796 | |
797 | #ifdef HAVE_LIBZ |
798 | if (tar_data->compression) |
799 | { |
800 | /* Flush the current buffer */ |
801 | if (!tar_write_compressed_data(NULL, 0, true)) |
802 | { |
803 | errno = EINVAL; |
804 | return -1; |
805 | } |
806 | } |
807 | #endif |
808 | |
809 | /* |
810 | * Now go back and update the header with the correct filesize and |
811 | * possibly also renaming the file. We overwrite the entire current header |
812 | * when done, including the checksum. |
813 | */ |
814 | print_tar_number(&(tf->header[124]), 12, filesize); |
815 | |
816 | if (method == CLOSE_NORMAL) |
817 | |
818 | /* |
819 | * We overwrite it with what it was before if we have no tempname, |
820 | * since we're going to write the buffer anyway. |
821 | */ |
822 | strlcpy(&(tf->header[0]), tf->pathname, 100); |
823 | |
824 | print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header)); |
825 | if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start) |
826 | return -1; |
827 | if (!tar_data->compression) |
828 | { |
829 | errno = 0; |
830 | if (write(tar_data->fd, tf->header, 512) != 512) |
831 | { |
832 | /* if write didn't set errno, assume problem is no disk space */ |
833 | if (errno == 0) |
834 | errno = ENOSPC; |
835 | return -1; |
836 | } |
837 | } |
838 | #ifdef HAVE_LIBZ |
839 | else |
840 | { |
841 | /* Turn off compression */ |
842 | if (deflateParams(tar_data->zp, 0, 0) != Z_OK) |
843 | { |
844 | tar_set_error("could not change compression parameters" ); |
845 | return -1; |
846 | } |
847 | |
848 | /* Overwrite the header, assuming the size will be the same */ |
849 | if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true)) |
850 | return -1; |
851 | |
852 | /* Turn compression back on */ |
853 | if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK) |
854 | { |
855 | tar_set_error("could not change compression parameters" ); |
856 | return -1; |
857 | } |
858 | } |
859 | #endif |
860 | |
861 | /* Move file pointer back down to end, so we can write the next file */ |
862 | if (lseek(tar_data->fd, 0, SEEK_END) < 0) |
863 | return -1; |
864 | |
865 | /* Always fsync on close, so the padding gets fsynced */ |
866 | if (tar_sync(f) < 0) |
867 | return -1; |
868 | |
869 | /* Clean up and done */ |
870 | pg_free(tf->pathname); |
871 | pg_free(tf); |
872 | tar_data->currentfile = NULL; |
873 | |
874 | return 0; |
875 | } |
876 | |
877 | static bool |
878 | tar_existsfile(const char *pathname) |
879 | { |
880 | tar_clear_error(); |
881 | /* We only deal with new tarfiles, so nothing externally created exists */ |
882 | return false; |
883 | } |
884 | |
885 | static bool |
886 | tar_finish(void) |
887 | { |
888 | char zerobuf[1024]; |
889 | |
890 | tar_clear_error(); |
891 | |
892 | if (tar_data->currentfile) |
893 | { |
894 | if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0) |
895 | return false; |
896 | } |
897 | |
898 | /* A tarfile always ends with two empty blocks */ |
899 | MemSet(zerobuf, 0, sizeof(zerobuf)); |
900 | if (!tar_data->compression) |
901 | { |
902 | errno = 0; |
903 | if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf)) |
904 | { |
905 | /* if write didn't set errno, assume problem is no disk space */ |
906 | if (errno == 0) |
907 | errno = ENOSPC; |
908 | return false; |
909 | } |
910 | } |
911 | #ifdef HAVE_LIBZ |
912 | else |
913 | { |
914 | if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false)) |
915 | return false; |
916 | |
917 | /* Also flush all data to make sure the gzip stream is finished */ |
918 | tar_data->zp->next_in = NULL; |
919 | tar_data->zp->avail_in = 0; |
920 | while (true) |
921 | { |
922 | int r; |
923 | |
924 | r = deflate(tar_data->zp, Z_FINISH); |
925 | |
926 | if (r == Z_STREAM_ERROR) |
927 | { |
928 | tar_set_error("could not compress data" ); |
929 | return false; |
930 | } |
931 | if (tar_data->zp->avail_out < ZLIB_OUT_SIZE) |
932 | { |
933 | size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out; |
934 | |
935 | errno = 0; |
936 | if (write(tar_data->fd, tar_data->zlibOut, len) != len) |
937 | { |
938 | /* |
939 | * If write didn't set errno, assume problem is no disk |
940 | * space. |
941 | */ |
942 | if (errno == 0) |
943 | errno = ENOSPC; |
944 | return false; |
945 | } |
946 | } |
947 | if (r == Z_STREAM_END) |
948 | break; |
949 | } |
950 | |
951 | if (deflateEnd(tar_data->zp) != Z_OK) |
952 | { |
953 | tar_set_error("could not close compression stream" ); |
954 | return false; |
955 | } |
956 | } |
957 | #endif |
958 | |
959 | /* sync the empty blocks as well, since they're after the last file */ |
960 | if (tar_data->sync) |
961 | { |
962 | if (fsync(tar_data->fd) != 0) |
963 | return false; |
964 | } |
965 | |
966 | if (close(tar_data->fd) != 0) |
967 | return false; |
968 | |
969 | tar_data->fd = -1; |
970 | |
971 | if (tar_data->sync) |
972 | { |
973 | if (fsync_fname(tar_data->tarfilename, false) != 0) |
974 | return false; |
975 | if (fsync_parent_path(tar_data->tarfilename) != 0) |
976 | return false; |
977 | } |
978 | |
979 | return true; |
980 | } |
981 | |
982 | WalWriteMethod * |
983 | CreateWalTarMethod(const char *tarbase, int compression, bool sync) |
984 | { |
985 | WalWriteMethod *method; |
986 | const char *suffix = (compression != 0) ? ".tar.gz" : ".tar" ; |
987 | |
988 | method = pg_malloc0(sizeof(WalWriteMethod)); |
989 | method->open_for_write = tar_open_for_write; |
990 | method->write = tar_write; |
991 | method->get_current_pos = tar_get_current_pos; |
992 | method->get_file_size = tar_get_file_size; |
993 | method->close = tar_close; |
994 | method->sync = tar_sync; |
995 | method->existsfile = tar_existsfile; |
996 | method->finish = tar_finish; |
997 | method->getlasterror = tar_getlasterror; |
998 | |
999 | tar_data = pg_malloc0(sizeof(TarMethodData)); |
1000 | tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1); |
1001 | sprintf(tar_data->tarfilename, "%s%s" , tarbase, suffix); |
1002 | tar_data->fd = -1; |
1003 | tar_data->compression = compression; |
1004 | tar_data->sync = sync; |
1005 | #ifdef HAVE_LIBZ |
1006 | if (compression) |
1007 | tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1); |
1008 | #endif |
1009 | |
1010 | return method; |
1011 | } |
1012 | |
1013 | void |
1014 | FreeWalTarMethod(void) |
1015 | { |
1016 | pg_free(tar_data->tarfilename); |
1017 | #ifdef HAVE_LIBZ |
1018 | if (tar_data->compression) |
1019 | pg_free(tar_data->zlibOut); |
1020 | #endif |
1021 | pg_free(tar_data); |
1022 | } |
1023 | |