1/*-------------------------------------------------------------------------
2 *
3 * walmethods.c - implementations of different ways to write received wal
4 *
5 * NOTE! The caller must ensure that only one method is instantiated in
6 * any given program, and that it's only instantiated once!
7 *
8 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
9 *
10 * IDENTIFICATION
11 * src/bin/pg_basebackup/walmethods.c
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres_fe.h"
16
17#include <sys/stat.h>
18#include <time.h>
19#include <unistd.h>
20#ifdef HAVE_LIBZ
21#include <zlib.h>
22#endif
23
24#include "pgtar.h"
25#include "common/file_perm.h"
26#include "common/file_utils.h"
27
28#include "receivelog.h"
29#include "streamutil.h"
30
31/* Size of zlib buffer for .tar.gz */
32#define ZLIB_OUT_SIZE 4096
33
34/*-------------------------------------------------------------------------
35 * WalDirectoryMethod - write wal to a directory looking like pg_wal
36 *-------------------------------------------------------------------------
37 */
38
39/*
40 * Global static data for this method
41 */
42typedef struct DirectoryMethodData
43{
44 char *basedir;
45 int compression;
46 bool sync;
47} DirectoryMethodData;
48static DirectoryMethodData *dir_data = NULL;
49
50/*
51 * Local file handle
52 */
53typedef struct DirectoryMethodFile
54{
55 int fd;
56 off_t currpos;
57 char *pathname;
58 char *fullpath;
59 char *temp_suffix;
60#ifdef HAVE_LIBZ
61 gzFile gzfp;
62#endif
63} DirectoryMethodFile;
64
65static const char *
66dir_getlasterror(void)
67{
68 /* Directory method always sets errno, so just use strerror */
69 return strerror(errno);
70}
71
72static Walfile
73dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
74{
75 static char tmppath[MAXPGPATH];
76 int fd;
77 DirectoryMethodFile *f;
78#ifdef HAVE_LIBZ
79 gzFile gzfp = NULL;
80#endif
81
82 snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
83 dir_data->basedir, pathname,
84 dir_data->compression > 0 ? ".gz" : "",
85 temp_suffix ? temp_suffix : "");
86
87 /*
88 * Open a file for non-compressed as well as compressed files. Tracking
89 * the file descriptor is important for dir_sync() method as gzflush()
90 * does not do any system calls to fsync() to make changes permanent on
91 * disk.
92 */
93 fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
94 if (fd < 0)
95 return NULL;
96
97#ifdef HAVE_LIBZ
98 if (dir_data->compression > 0)
99 {
100 gzfp = gzdopen(fd, "wb");
101 if (gzfp == NULL)
102 {
103 close(fd);
104 return NULL;
105 }
106
107 if (gzsetparams(gzfp, dir_data->compression,
108 Z_DEFAULT_STRATEGY) != Z_OK)
109 {
110 gzclose(gzfp);
111 return NULL;
112 }
113 }
114#endif
115
116 /* Do pre-padding on non-compressed files */
117 if (pad_to_size && dir_data->compression == 0)
118 {
119 PGAlignedXLogBlock zerobuf;
120 int bytes;
121
122 memset(zerobuf.data, 0, XLOG_BLCKSZ);
123 for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
124 {
125 errno = 0;
126 if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
127 {
128 int save_errno = errno;
129
130 close(fd);
131
132 /*
133 * If write didn't set errno, assume problem is no disk space.
134 */
135 errno = save_errno ? save_errno : ENOSPC;
136 return NULL;
137 }
138 }
139
140 if (lseek(fd, 0, SEEK_SET) != 0)
141 {
142 int save_errno = errno;
143
144 close(fd);
145 errno = save_errno;
146 return NULL;
147 }
148 }
149
150 /*
151 * fsync WAL file and containing directory, to ensure the file is
152 * persistently created and zeroed (if padded). That's particularly
153 * important when using synchronous mode, where the file is modified and
154 * fsynced in-place, without a directory fsync.
155 */
156 if (dir_data->sync)
157 {
158 if (fsync_fname(tmppath, false) != 0 ||
159 fsync_parent_path(tmppath) != 0)
160 {
161#ifdef HAVE_LIBZ
162 if (dir_data->compression > 0)
163 gzclose(gzfp);
164 else
165#endif
166 close(fd);
167 return NULL;
168 }
169 }
170
171 f = pg_malloc0(sizeof(DirectoryMethodFile));
172#ifdef HAVE_LIBZ
173 if (dir_data->compression > 0)
174 f->gzfp = gzfp;
175#endif
176 f->fd = fd;
177 f->currpos = 0;
178 f->pathname = pg_strdup(pathname);
179 f->fullpath = pg_strdup(tmppath);
180 if (temp_suffix)
181 f->temp_suffix = pg_strdup(temp_suffix);
182
183 return f;
184}
185
186static ssize_t
187dir_write(Walfile f, const void *buf, size_t count)
188{
189 ssize_t r;
190 DirectoryMethodFile *df = (DirectoryMethodFile *) f;
191
192 Assert(f != NULL);
193
194#ifdef HAVE_LIBZ
195 if (dir_data->compression > 0)
196 r = (ssize_t) gzwrite(df->gzfp, buf, count);
197 else
198#endif
199 r = write(df->fd, buf, count);
200 if (r > 0)
201 df->currpos += r;
202 return r;
203}
204
205static off_t
206dir_get_current_pos(Walfile f)
207{
208 Assert(f != NULL);
209
210 /* Use a cached value to prevent lots of reseeks */
211 return ((DirectoryMethodFile *) f)->currpos;
212}
213
214static int
215dir_close(Walfile f, WalCloseMethod method)
216{
217 int r;
218 DirectoryMethodFile *df = (DirectoryMethodFile *) f;
219 static char tmppath[MAXPGPATH];
220 static char tmppath2[MAXPGPATH];
221
222 Assert(f != NULL);
223
224#ifdef HAVE_LIBZ
225 if (dir_data->compression > 0)
226 r = gzclose(df->gzfp);
227 else
228#endif
229 r = close(df->fd);
230
231 if (r == 0)
232 {
233 /* Build path to the current version of the file */
234 if (method == CLOSE_NORMAL && df->temp_suffix)
235 {
236 /*
237 * If we have a temp prefix, normal operation is to rename the
238 * file.
239 */
240 snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
241 dir_data->basedir, df->pathname,
242 dir_data->compression > 0 ? ".gz" : "",
243 df->temp_suffix);
244 snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
245 dir_data->basedir, df->pathname,
246 dir_data->compression > 0 ? ".gz" : "");
247 r = durable_rename(tmppath, tmppath2);
248 }
249 else if (method == CLOSE_UNLINK)
250 {
251 /* Unlink the file once it's closed */
252 snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
253 dir_data->basedir, df->pathname,
254 dir_data->compression > 0 ? ".gz" : "",
255 df->temp_suffix ? df->temp_suffix : "");
256 r = unlink(tmppath);
257 }
258 else
259 {
260 /*
261 * Else either CLOSE_NORMAL and no temp suffix, or
262 * CLOSE_NO_RENAME. In this case, fsync the file and containing
263 * directory if sync mode is requested.
264 */
265 if (dir_data->sync)
266 {
267 r = fsync_fname(df->fullpath, false);
268 if (r == 0)
269 r = fsync_parent_path(df->fullpath);
270 }
271 }
272 }
273
274 pg_free(df->pathname);
275 pg_free(df->fullpath);
276 if (df->temp_suffix)
277 pg_free(df->temp_suffix);
278 pg_free(df);
279
280 return r;
281}
282
283static int
284dir_sync(Walfile f)
285{
286 Assert(f != NULL);
287
288 if (!dir_data->sync)
289 return 0;
290
291#ifdef HAVE_LIBZ
292 if (dir_data->compression > 0)
293 {
294 if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
295 return -1;
296 }
297#endif
298
299 return fsync(((DirectoryMethodFile *) f)->fd);
300}
301
302static ssize_t
303dir_get_file_size(const char *pathname)
304{
305 struct stat statbuf;
306 static char tmppath[MAXPGPATH];
307
308 snprintf(tmppath, sizeof(tmppath), "%s/%s",
309 dir_data->basedir, pathname);
310
311 if (stat(tmppath, &statbuf) != 0)
312 return -1;
313
314 return statbuf.st_size;
315}
316
317static bool
318dir_existsfile(const char *pathname)
319{
320 static char tmppath[MAXPGPATH];
321 int fd;
322
323 snprintf(tmppath, sizeof(tmppath), "%s/%s",
324 dir_data->basedir, pathname);
325
326 fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
327 if (fd < 0)
328 return false;
329 close(fd);
330 return true;
331}
332
333static bool
334dir_finish(void)
335{
336 if (dir_data->sync)
337 {
338 /*
339 * Files are fsynced when they are closed, but we need to fsync the
340 * directory entry here as well.
341 */
342 if (fsync_fname(dir_data->basedir, true) != 0)
343 return false;
344 }
345 return true;
346}
347
348
349WalWriteMethod *
350CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
351{
352 WalWriteMethod *method;
353
354 method = pg_malloc0(sizeof(WalWriteMethod));
355 method->open_for_write = dir_open_for_write;
356 method->write = dir_write;
357 method->get_current_pos = dir_get_current_pos;
358 method->get_file_size = dir_get_file_size;
359 method->close = dir_close;
360 method->sync = dir_sync;
361 method->existsfile = dir_existsfile;
362 method->finish = dir_finish;
363 method->getlasterror = dir_getlasterror;
364
365 dir_data = pg_malloc0(sizeof(DirectoryMethodData));
366 dir_data->compression = compression;
367 dir_data->basedir = pg_strdup(basedir);
368 dir_data->sync = sync;
369
370 return method;
371}
372
373void
374FreeWalDirectoryMethod(void)
375{
376 pg_free(dir_data->basedir);
377 pg_free(dir_data);
378}
379
380
381/*-------------------------------------------------------------------------
382 * WalTarMethod - write wal to a tar file containing pg_wal contents
383 *-------------------------------------------------------------------------
384 */
385
386typedef struct TarMethodFile
387{
388 off_t ofs_start; /* Where does the *header* for this file start */
389 off_t currpos;
390 char header[512];
391 char *pathname;
392 size_t pad_to_size;
393} TarMethodFile;
394
395typedef struct TarMethodData
396{
397 char *tarfilename;
398 int fd;
399 int compression;
400 bool sync;
401 TarMethodFile *currentfile;
402 char lasterror[1024];
403#ifdef HAVE_LIBZ
404 z_streamp zp;
405 void *zlibOut;
406#endif
407} TarMethodData;
408static TarMethodData *tar_data = NULL;
409
410#define tar_clear_error() tar_data->lasterror[0] = '\0'
411#define tar_set_error(msg) strlcpy(tar_data->lasterror, _(msg), sizeof(tar_data->lasterror))
412
413static const char *
414tar_getlasterror(void)
415{
416 /*
417 * If a custom error is set, return that one. Otherwise, assume errno is
418 * set and return that one.
419 */
420 if (tar_data->lasterror[0])
421 return tar_data->lasterror;
422 return strerror(errno);
423}
424
425#ifdef HAVE_LIBZ
426static bool
427tar_write_compressed_data(void *buf, size_t count, bool flush)
428{
429 tar_data->zp->next_in = buf;
430 tar_data->zp->avail_in = count;
431
432 while (tar_data->zp->avail_in || flush)
433 {
434 int r;
435
436 r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
437 if (r == Z_STREAM_ERROR)
438 {
439 tar_set_error("could not compress data");
440 return false;
441 }
442
443 if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
444 {
445 size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
446
447 errno = 0;
448 if (write(tar_data->fd, tar_data->zlibOut, len) != len)
449 {
450 /*
451 * If write didn't set errno, assume problem is no disk space.
452 */
453 if (errno == 0)
454 errno = ENOSPC;
455 return false;
456 }
457
458 tar_data->zp->next_out = tar_data->zlibOut;
459 tar_data->zp->avail_out = ZLIB_OUT_SIZE;
460 }
461
462 if (r == Z_STREAM_END)
463 break;
464 }
465
466 if (flush)
467 {
468 /* Reset the stream for writing */
469 if (deflateReset(tar_data->zp) != Z_OK)
470 {
471 tar_set_error("could not reset compression stream");
472 return false;
473 }
474 }
475
476 return true;
477}
478#endif
479
480static ssize_t
481tar_write(Walfile f, const void *buf, size_t count)
482{
483 ssize_t r;
484
485 Assert(f != NULL);
486 tar_clear_error();
487
488 /* Tarfile will always be positioned at the end */
489 if (!tar_data->compression)
490 {
491 r = write(tar_data->fd, buf, count);
492 if (r > 0)
493 ((TarMethodFile *) f)->currpos += r;
494 return r;
495 }
496#ifdef HAVE_LIBZ
497 else
498 {
499 if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
500 return -1;
501 ((TarMethodFile *) f)->currpos += count;
502 return count;
503 }
504#else
505 else
506 /* Can't happen - compression enabled with no libz */
507 return -1;
508#endif
509}
510
511static bool
512tar_write_padding_data(TarMethodFile *f, size_t bytes)
513{
514 PGAlignedXLogBlock zerobuf;
515 size_t bytesleft = bytes;
516
517 memset(zerobuf.data, 0, XLOG_BLCKSZ);
518 while (bytesleft)
519 {
520 size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
521 ssize_t r = tar_write(f, zerobuf.data, bytestowrite);
522
523 if (r < 0)
524 return false;
525 bytesleft -= r;
526 }
527
528 return true;
529}
530
531static Walfile
532tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
533{
534 int save_errno;
535 static char tmppath[MAXPGPATH];
536
537 tar_clear_error();
538
539 if (tar_data->fd < 0)
540 {
541 /*
542 * We open the tar file only when we first try to write to it.
543 */
544 tar_data->fd = open(tar_data->tarfilename,
545 O_WRONLY | O_CREAT | PG_BINARY,
546 pg_file_create_mode);
547 if (tar_data->fd < 0)
548 return NULL;
549
550#ifdef HAVE_LIBZ
551 if (tar_data->compression)
552 {
553 tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
554 tar_data->zp->zalloc = Z_NULL;
555 tar_data->zp->zfree = Z_NULL;
556 tar_data->zp->opaque = Z_NULL;
557 tar_data->zp->next_out = tar_data->zlibOut;
558 tar_data->zp->avail_out = ZLIB_OUT_SIZE;
559
560 /*
561 * Initialize deflation library. Adding the magic value 16 to the
562 * default 15 for the windowBits parameter makes the output be
563 * gzip instead of zlib.
564 */
565 if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
566 {
567 pg_free(tar_data->zp);
568 tar_data->zp = NULL;
569 tar_set_error("could not initialize compression library");
570 return NULL;
571 }
572 }
573#endif
574
575 /* There's no tar header itself, the file starts with regular files */
576 }
577
578 Assert(tar_data->currentfile == NULL);
579 if (tar_data->currentfile != NULL)
580 {
581 tar_set_error("implementation error: tar files can't have more than one open file");
582 return NULL;
583 }
584
585 tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
586
587 snprintf(tmppath, sizeof(tmppath), "%s%s",
588 pathname, temp_suffix ? temp_suffix : "");
589
590 /* Create a header with size set to 0 - we will fill out the size on close */
591 if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
592 {
593 pg_free(tar_data->currentfile);
594 tar_data->currentfile = NULL;
595 tar_set_error("could not create tar header");
596 return NULL;
597 }
598
599#ifdef HAVE_LIBZ
600 if (tar_data->compression)
601 {
602 /* Flush existing data */
603 if (!tar_write_compressed_data(NULL, 0, true))
604 return NULL;
605
606 /* Turn off compression for header */
607 if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
608 {
609 tar_set_error("could not change compression parameters");
610 return NULL;
611 }
612 }
613#endif
614
615 tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
616 if (tar_data->currentfile->ofs_start == -1)
617 {
618 save_errno = errno;
619 pg_free(tar_data->currentfile);
620 tar_data->currentfile = NULL;
621 errno = save_errno;
622 return NULL;
623 }
624 tar_data->currentfile->currpos = 0;
625
626 if (!tar_data->compression)
627 {
628 errno = 0;
629 if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
630 {
631 save_errno = errno;
632 pg_free(tar_data->currentfile);
633 tar_data->currentfile = NULL;
634 /* if write didn't set errno, assume problem is no disk space */
635 errno = save_errno ? save_errno : ENOSPC;
636 return NULL;
637 }
638 }
639#ifdef HAVE_LIBZ
640 else
641 {
642 /* Write header through the zlib APIs but with no compression */
643 if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
644 return NULL;
645
646 /* Re-enable compression for the rest of the file */
647 if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
648 {
649 tar_set_error("could not change compression parameters");
650 return NULL;
651 }
652 }
653#endif
654
655 tar_data->currentfile->pathname = pg_strdup(pathname);
656
657 /*
658 * Uncompressed files are padded on creation, but for compression we can't
659 * do that
660 */
661 if (pad_to_size)
662 {
663 tar_data->currentfile->pad_to_size = pad_to_size;
664 if (!tar_data->compression)
665 {
666 /* Uncompressed, so pad now */
667 tar_write_padding_data(tar_data->currentfile, pad_to_size);
668 /* Seek back to start */
669 if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
670 return NULL;
671
672 tar_data->currentfile->currpos = 0;
673 }
674 }
675
676 return tar_data->currentfile;
677}
678
679static ssize_t
680tar_get_file_size(const char *pathname)
681{
682 tar_clear_error();
683
684 /* Currently not used, so not supported */
685 errno = ENOSYS;
686 return -1;
687}
688
689static off_t
690tar_get_current_pos(Walfile f)
691{
692 Assert(f != NULL);
693 tar_clear_error();
694
695 return ((TarMethodFile *) f)->currpos;
696}
697
698static int
699tar_sync(Walfile f)
700{
701 Assert(f != NULL);
702 tar_clear_error();
703
704 if (!tar_data->sync)
705 return 0;
706
707 /*
708 * Always sync the whole tarfile, because that's all we can do. This makes
709 * no sense on compressed files, so just ignore those.
710 */
711 if (tar_data->compression)
712 return 0;
713
714 return fsync(tar_data->fd);
715}
716
717static int
718tar_close(Walfile f, WalCloseMethod method)
719{
720 ssize_t filesize;
721 int padding;
722 TarMethodFile *tf = (TarMethodFile *) f;
723
724 Assert(f != NULL);
725 tar_clear_error();
726
727 if (method == CLOSE_UNLINK)
728 {
729 if (tar_data->compression)
730 {
731 tar_set_error("unlink not supported with compression");
732 return -1;
733 }
734
735 /*
736 * Unlink the file that we just wrote to the tar. We do this by
737 * truncating it to the start of the header. This is safe as we only
738 * allow writing of the very last file.
739 */
740 if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
741 return -1;
742
743 pg_free(tf->pathname);
744 pg_free(tf);
745 tar_data->currentfile = NULL;
746
747 return 0;
748 }
749
750 /*
751 * Pad the file itself with zeroes if necessary. Note that this is
752 * different from the tar format padding -- this is the padding we asked
753 * for when the file was opened.
754 */
755 if (tf->pad_to_size)
756 {
757 if (tar_data->compression)
758 {
759 /*
760 * A compressed tarfile is padded on close since we cannot know
761 * the size of the compressed output until the end.
762 */
763 size_t sizeleft = tf->pad_to_size - tf->currpos;
764
765 if (sizeleft)
766 {
767 if (!tar_write_padding_data(tf, sizeleft))
768 return -1;
769 }
770 }
771 else
772 {
773 /*
774 * An uncompressed tarfile was padded on creation, so just adjust
775 * the current position as if we seeked to the end.
776 */
777 tf->currpos = tf->pad_to_size;
778 }
779 }
780
781 /*
782 * Get the size of the file, and pad the current data up to the nearest
783 * 512 byte boundary.
784 */
785 filesize = tar_get_current_pos(f);
786 padding = ((filesize + 511) & ~511) - filesize;
787 if (padding)
788 {
789 char zerobuf[512];
790
791 MemSet(zerobuf, 0, padding);
792 if (tar_write(f, zerobuf, padding) != padding)
793 return -1;
794 }
795
796
797#ifdef HAVE_LIBZ
798 if (tar_data->compression)
799 {
800 /* Flush the current buffer */
801 if (!tar_write_compressed_data(NULL, 0, true))
802 {
803 errno = EINVAL;
804 return -1;
805 }
806 }
807#endif
808
809 /*
810 * Now go back and update the header with the correct filesize and
811 * possibly also renaming the file. We overwrite the entire current header
812 * when done, including the checksum.
813 */
814 print_tar_number(&(tf->header[124]), 12, filesize);
815
816 if (method == CLOSE_NORMAL)
817
818 /*
819 * We overwrite it with what it was before if we have no tempname,
820 * since we're going to write the buffer anyway.
821 */
822 strlcpy(&(tf->header[0]), tf->pathname, 100);
823
824 print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
825 if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
826 return -1;
827 if (!tar_data->compression)
828 {
829 errno = 0;
830 if (write(tar_data->fd, tf->header, 512) != 512)
831 {
832 /* if write didn't set errno, assume problem is no disk space */
833 if (errno == 0)
834 errno = ENOSPC;
835 return -1;
836 }
837 }
838#ifdef HAVE_LIBZ
839 else
840 {
841 /* Turn off compression */
842 if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
843 {
844 tar_set_error("could not change compression parameters");
845 return -1;
846 }
847
848 /* Overwrite the header, assuming the size will be the same */
849 if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
850 return -1;
851
852 /* Turn compression back on */
853 if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
854 {
855 tar_set_error("could not change compression parameters");
856 return -1;
857 }
858 }
859#endif
860
861 /* Move file pointer back down to end, so we can write the next file */
862 if (lseek(tar_data->fd, 0, SEEK_END) < 0)
863 return -1;
864
865 /* Always fsync on close, so the padding gets fsynced */
866 if (tar_sync(f) < 0)
867 return -1;
868
869 /* Clean up and done */
870 pg_free(tf->pathname);
871 pg_free(tf);
872 tar_data->currentfile = NULL;
873
874 return 0;
875}
876
877static bool
878tar_existsfile(const char *pathname)
879{
880 tar_clear_error();
881 /* We only deal with new tarfiles, so nothing externally created exists */
882 return false;
883}
884
885static bool
886tar_finish(void)
887{
888 char zerobuf[1024];
889
890 tar_clear_error();
891
892 if (tar_data->currentfile)
893 {
894 if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
895 return false;
896 }
897
898 /* A tarfile always ends with two empty blocks */
899 MemSet(zerobuf, 0, sizeof(zerobuf));
900 if (!tar_data->compression)
901 {
902 errno = 0;
903 if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
904 {
905 /* if write didn't set errno, assume problem is no disk space */
906 if (errno == 0)
907 errno = ENOSPC;
908 return false;
909 }
910 }
911#ifdef HAVE_LIBZ
912 else
913 {
914 if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
915 return false;
916
917 /* Also flush all data to make sure the gzip stream is finished */
918 tar_data->zp->next_in = NULL;
919 tar_data->zp->avail_in = 0;
920 while (true)
921 {
922 int r;
923
924 r = deflate(tar_data->zp, Z_FINISH);
925
926 if (r == Z_STREAM_ERROR)
927 {
928 tar_set_error("could not compress data");
929 return false;
930 }
931 if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
932 {
933 size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
934
935 errno = 0;
936 if (write(tar_data->fd, tar_data->zlibOut, len) != len)
937 {
938 /*
939 * If write didn't set errno, assume problem is no disk
940 * space.
941 */
942 if (errno == 0)
943 errno = ENOSPC;
944 return false;
945 }
946 }
947 if (r == Z_STREAM_END)
948 break;
949 }
950
951 if (deflateEnd(tar_data->zp) != Z_OK)
952 {
953 tar_set_error("could not close compression stream");
954 return false;
955 }
956 }
957#endif
958
959 /* sync the empty blocks as well, since they're after the last file */
960 if (tar_data->sync)
961 {
962 if (fsync(tar_data->fd) != 0)
963 return false;
964 }
965
966 if (close(tar_data->fd) != 0)
967 return false;
968
969 tar_data->fd = -1;
970
971 if (tar_data->sync)
972 {
973 if (fsync_fname(tar_data->tarfilename, false) != 0)
974 return false;
975 if (fsync_parent_path(tar_data->tarfilename) != 0)
976 return false;
977 }
978
979 return true;
980}
981
982WalWriteMethod *
983CreateWalTarMethod(const char *tarbase, int compression, bool sync)
984{
985 WalWriteMethod *method;
986 const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
987
988 method = pg_malloc0(sizeof(WalWriteMethod));
989 method->open_for_write = tar_open_for_write;
990 method->write = tar_write;
991 method->get_current_pos = tar_get_current_pos;
992 method->get_file_size = tar_get_file_size;
993 method->close = tar_close;
994 method->sync = tar_sync;
995 method->existsfile = tar_existsfile;
996 method->finish = tar_finish;
997 method->getlasterror = tar_getlasterror;
998
999 tar_data = pg_malloc0(sizeof(TarMethodData));
1000 tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1001 sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
1002 tar_data->fd = -1;
1003 tar_data->compression = compression;
1004 tar_data->sync = sync;
1005#ifdef HAVE_LIBZ
1006 if (compression)
1007 tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1008#endif
1009
1010 return method;
1011}
1012
1013void
1014FreeWalTarMethod(void)
1015{
1016 pg_free(tar_data->tarfilename);
1017#ifdef HAVE_LIBZ
1018 if (tar_data->compression)
1019 pg_free(tar_data->zlibOut);
1020#endif
1021 pg_free(tar_data);
1022}
1023