1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * pg_receivewal.c - receive streaming WAL data and write it |
4 | * to a local file. |
5 | * |
6 | * Author: Magnus Hagander <magnus@hagander.net> |
7 | * |
8 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
9 | * |
10 | * IDENTIFICATION |
11 | * src/bin/pg_basebackup/pg_receivewal.c |
12 | *------------------------------------------------------------------------- |
13 | */ |
14 | |
15 | #include "postgres_fe.h" |
16 | |
17 | #include <dirent.h> |
18 | #include <signal.h> |
19 | #include <sys/stat.h> |
20 | #include <unistd.h> |
21 | |
22 | #include "common/file_perm.h" |
23 | #include "common/logging.h" |
24 | #include "libpq-fe.h" |
25 | #include "access/xlog_internal.h" |
26 | #include "getopt_long.h" |
27 | |
28 | #include "receivelog.h" |
29 | #include "streamutil.h" |
30 | |
31 | |
32 | /* Time to sleep between reconnection attempts */ |
33 | #define RECONNECT_SLEEP_TIME 5 |
34 | |
35 | /* Global options */ |
36 | static char *basedir = NULL; |
37 | static int verbose = 0; |
38 | static int compresslevel = 0; |
39 | static int noloop = 0; |
40 | static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ |
41 | static volatile bool time_to_stop = false; |
42 | static bool do_create_slot = false; |
43 | static bool slot_exists_ok = false; |
44 | static bool do_drop_slot = false; |
45 | static bool do_sync = true; |
46 | static bool synchronous = false; |
47 | static char *replication_slot = NULL; |
48 | static XLogRecPtr endpos = InvalidXLogRecPtr; |
49 | |
50 | |
51 | static void usage(void); |
52 | static DIR *get_destination_dir(char *dest_folder); |
53 | static void close_destination_dir(DIR *dest_dir, char *dest_folder); |
54 | static XLogRecPtr FindStreamingStart(uint32 *tli); |
55 | static void StreamLog(void); |
56 | static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, |
57 | bool segment_finished); |
58 | |
59 | static void |
60 | disconnect_atexit(void) |
61 | { |
62 | if (conn != NULL) |
63 | PQfinish(conn); |
64 | } |
65 | |
66 | /* Routines to evaluate segment file format */ |
67 | #define IsCompressXLogFileName(fname) \ |
68 | (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \ |
69 | strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \ |
70 | strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0) |
71 | #define IsPartialCompressXLogFileName(fname) \ |
72 | (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \ |
73 | strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \ |
74 | strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0) |
75 | |
76 | static void |
77 | usage(void) |
78 | { |
79 | printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n" ), |
80 | progname); |
81 | printf(_("Usage:\n" )); |
82 | printf(_(" %s [OPTION]...\n" ), progname); |
83 | printf(_("\nOptions:\n" )); |
84 | printf(_(" -D, --directory=DIR receive write-ahead log files into this directory\n" )); |
85 | printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n" )); |
86 | printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n" )); |
87 | printf(_(" -n, --no-loop do not loop on connection lost\n" )); |
88 | printf(_(" --no-sync do not wait for changes to be written safely to disk\n" )); |
89 | printf(_(" -s, --status-interval=SECS\n" |
90 | " time between status packets sent to server (default: %d)\n" ), (standby_message_timeout / 1000)); |
91 | printf(_(" -S, --slot=SLOTNAME replication slot to use\n" )); |
92 | printf(_(" --synchronous flush write-ahead log immediately after writing\n" )); |
93 | printf(_(" -v, --verbose output verbose messages\n" )); |
94 | printf(_(" -V, --version output version information, then exit\n" )); |
95 | printf(_(" -Z, --compress=0-9 compress logs with given compression level\n" )); |
96 | printf(_(" -?, --help show this help, then exit\n" )); |
97 | printf(_("\nConnection options:\n" )); |
98 | printf(_(" -d, --dbname=CONNSTR connection string\n" )); |
99 | printf(_(" -h, --host=HOSTNAME database server host or socket directory\n" )); |
100 | printf(_(" -p, --port=PORT database server port number\n" )); |
101 | printf(_(" -U, --username=NAME connect as specified database user\n" )); |
102 | printf(_(" -w, --no-password never prompt for password\n" )); |
103 | printf(_(" -W, --password force password prompt (should happen automatically)\n" )); |
104 | printf(_("\nOptional actions:\n" )); |
105 | printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n" )); |
106 | printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n" )); |
107 | printf(_("\nReport bugs to <pgsql-bugs@lists.postgresql.org>.\n" )); |
108 | } |
109 | |
110 | static bool |
111 | stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) |
112 | { |
113 | static uint32 prevtimeline = 0; |
114 | static XLogRecPtr prevpos = InvalidXLogRecPtr; |
115 | |
116 | /* we assume that we get called once at the end of each segment */ |
117 | if (verbose && segment_finished) |
118 | pg_log_info("finished segment at %X/%X (timeline %u)" , |
119 | (uint32) (xlogpos >> 32), (uint32) xlogpos, |
120 | timeline); |
121 | |
122 | if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos) |
123 | { |
124 | if (verbose) |
125 | pg_log_info("stopped log streaming at %X/%X (timeline %u)" , |
126 | (uint32) (xlogpos >> 32), (uint32) xlogpos, |
127 | timeline); |
128 | time_to_stop = true; |
129 | return true; |
130 | } |
131 | |
132 | /* |
133 | * Note that we report the previous, not current, position here. After a |
134 | * timeline switch, xlogpos points to the beginning of the segment because |
135 | * that's where we always begin streaming. Reporting the end of previous |
136 | * timeline isn't totally accurate, because the next timeline can begin |
137 | * slightly before the end of the WAL that we received on the previous |
138 | * timeline, but it's close enough for reporting purposes. |
139 | */ |
140 | if (verbose && prevtimeline != 0 && prevtimeline != timeline) |
141 | pg_log_info("switched to timeline %u at %X/%X" , |
142 | timeline, |
143 | (uint32) (prevpos >> 32), (uint32) prevpos); |
144 | |
145 | prevtimeline = timeline; |
146 | prevpos = xlogpos; |
147 | |
148 | if (time_to_stop) |
149 | { |
150 | if (verbose) |
151 | pg_log_info("received interrupt signal, exiting" ); |
152 | return true; |
153 | } |
154 | return false; |
155 | } |
156 | |
157 | |
158 | /* |
159 | * Get destination directory. |
160 | */ |
161 | static DIR * |
162 | get_destination_dir(char *dest_folder) |
163 | { |
164 | DIR *dir; |
165 | |
166 | Assert(dest_folder != NULL); |
167 | dir = opendir(dest_folder); |
168 | if (dir == NULL) |
169 | { |
170 | pg_log_error("could not open directory \"%s\": %m" , basedir); |
171 | exit(1); |
172 | } |
173 | |
174 | return dir; |
175 | } |
176 | |
177 | |
178 | /* |
179 | * Close existing directory. |
180 | */ |
181 | static void |
182 | close_destination_dir(DIR *dest_dir, char *dest_folder) |
183 | { |
184 | Assert(dest_dir != NULL && dest_folder != NULL); |
185 | if (closedir(dest_dir)) |
186 | { |
187 | pg_log_error("could not close directory \"%s\": %m" , dest_folder); |
188 | exit(1); |
189 | } |
190 | } |
191 | |
192 | |
193 | /* |
194 | * Determine starting location for streaming, based on any existing xlog |
195 | * segments in the directory. We start at the end of the last one that is |
196 | * complete (size matches wal segment size), on the timeline with highest ID. |
197 | * |
198 | * If there are no WAL files in the directory, returns InvalidXLogRecPtr. |
199 | */ |
200 | static XLogRecPtr |
201 | FindStreamingStart(uint32 *tli) |
202 | { |
203 | DIR *dir; |
204 | struct dirent *dirent; |
205 | XLogSegNo high_segno = 0; |
206 | uint32 high_tli = 0; |
207 | bool high_ispartial = false; |
208 | |
209 | dir = get_destination_dir(basedir); |
210 | |
211 | while (errno = 0, (dirent = readdir(dir)) != NULL) |
212 | { |
213 | uint32 tli; |
214 | XLogSegNo segno; |
215 | bool ispartial; |
216 | bool iscompress; |
217 | |
218 | /* |
219 | * Check if the filename looks like an xlog file, or a .partial file. |
220 | */ |
221 | if (IsXLogFileName(dirent->d_name)) |
222 | { |
223 | ispartial = false; |
224 | iscompress = false; |
225 | } |
226 | else if (IsPartialXLogFileName(dirent->d_name)) |
227 | { |
228 | ispartial = true; |
229 | iscompress = false; |
230 | } |
231 | else if (IsCompressXLogFileName(dirent->d_name)) |
232 | { |
233 | ispartial = false; |
234 | iscompress = true; |
235 | } |
236 | else if (IsPartialCompressXLogFileName(dirent->d_name)) |
237 | { |
238 | ispartial = true; |
239 | iscompress = true; |
240 | } |
241 | else |
242 | continue; |
243 | |
244 | /* |
245 | * Looks like an xlog file. Parse its position. |
246 | */ |
247 | XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz); |
248 | |
249 | /* |
250 | * Check that the segment has the right size, if it's supposed to be |
251 | * completed. For non-compressed segments just check the on-disk size |
252 | * and see if it matches a completed segment. For compressed segments, |
253 | * look at the last 4 bytes of the compressed file, which is where the |
254 | * uncompressed size is located for gz files with a size lower than |
255 | * 4GB, and then compare it to the size of a completed segment. The 4 |
256 | * last bytes correspond to the ISIZE member according to |
257 | * http://www.zlib.org/rfc-gzip.html. |
258 | */ |
259 | if (!ispartial && !iscompress) |
260 | { |
261 | struct stat statbuf; |
262 | char fullpath[MAXPGPATH * 2]; |
263 | |
264 | snprintf(fullpath, sizeof(fullpath), "%s/%s" , basedir, dirent->d_name); |
265 | if (stat(fullpath, &statbuf) != 0) |
266 | { |
267 | pg_log_error("could not stat file \"%s\": %m" , fullpath); |
268 | exit(1); |
269 | } |
270 | |
271 | if (statbuf.st_size != WalSegSz) |
272 | { |
273 | pg_log_warning("segment file \"%s\" has incorrect size %d, skipping" , |
274 | dirent->d_name, (int) statbuf.st_size); |
275 | continue; |
276 | } |
277 | } |
278 | else if (!ispartial && iscompress) |
279 | { |
280 | int fd; |
281 | char buf[4]; |
282 | int bytes_out; |
283 | char fullpath[MAXPGPATH * 2]; |
284 | int r; |
285 | |
286 | snprintf(fullpath, sizeof(fullpath), "%s/%s" , basedir, dirent->d_name); |
287 | |
288 | fd = open(fullpath, O_RDONLY | PG_BINARY, 0); |
289 | if (fd < 0) |
290 | { |
291 | pg_log_error("could not open compressed file \"%s\": %m" , |
292 | fullpath); |
293 | exit(1); |
294 | } |
295 | if (lseek(fd, (off_t) (-4), SEEK_END) < 0) |
296 | { |
297 | pg_log_error("could not seek in compressed file \"%s\": %m" , |
298 | fullpath); |
299 | exit(1); |
300 | } |
301 | r = read(fd, (char *) buf, sizeof(buf)); |
302 | if (r != sizeof(buf)) |
303 | { |
304 | if (r < 0) |
305 | pg_log_error("could not read compressed file \"%s\": %m" , |
306 | fullpath); |
307 | else |
308 | pg_log_error("could not read compressed file \"%s\": read %d of %zu" , |
309 | fullpath, r, sizeof(buf)); |
310 | exit(1); |
311 | } |
312 | |
313 | close(fd); |
314 | bytes_out = (buf[3] << 24) | (buf[2] << 16) | |
315 | (buf[1] << 8) | buf[0]; |
316 | |
317 | if (bytes_out != WalSegSz) |
318 | { |
319 | pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping" , |
320 | dirent->d_name, bytes_out); |
321 | continue; |
322 | } |
323 | } |
324 | |
325 | /* Looks like a valid segment. Remember that we saw it. */ |
326 | if ((segno > high_segno) || |
327 | (segno == high_segno && tli > high_tli) || |
328 | (segno == high_segno && tli == high_tli && high_ispartial && !ispartial)) |
329 | { |
330 | high_segno = segno; |
331 | high_tli = tli; |
332 | high_ispartial = ispartial; |
333 | } |
334 | } |
335 | |
336 | if (errno) |
337 | { |
338 | pg_log_error("could not read directory \"%s\": %m" , basedir); |
339 | exit(1); |
340 | } |
341 | |
342 | close_destination_dir(dir, basedir); |
343 | |
344 | if (high_segno > 0) |
345 | { |
346 | XLogRecPtr high_ptr; |
347 | |
348 | /* |
349 | * Move the starting pointer to the start of the next segment, if the |
350 | * highest one we saw was completed. Otherwise start streaming from |
351 | * the beginning of the .partial segment. |
352 | */ |
353 | if (!high_ispartial) |
354 | high_segno++; |
355 | |
356 | XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr); |
357 | |
358 | *tli = high_tli; |
359 | return high_ptr; |
360 | } |
361 | else |
362 | return InvalidXLogRecPtr; |
363 | } |
364 | |
365 | /* |
366 | * Start the log streaming |
367 | */ |
368 | static void |
369 | StreamLog(void) |
370 | { |
371 | XLogRecPtr serverpos; |
372 | TimeLineID servertli; |
373 | StreamCtl stream; |
374 | |
375 | MemSet(&stream, 0, sizeof(stream)); |
376 | |
377 | /* |
378 | * Connect in replication mode to the server |
379 | */ |
380 | if (conn == NULL) |
381 | conn = GetConnection(); |
382 | if (!conn) |
383 | /* Error message already written in GetConnection() */ |
384 | return; |
385 | |
386 | if (!CheckServerVersionForStreaming(conn)) |
387 | { |
388 | /* |
389 | * Error message already written in CheckServerVersionForStreaming(). |
390 | * There's no hope of recovering from a version mismatch, so don't |
391 | * retry. |
392 | */ |
393 | exit(1); |
394 | } |
395 | |
396 | /* |
397 | * Identify server, obtaining start LSN position and current timeline ID |
398 | * at the same time, necessary if not valid data can be found in the |
399 | * existing output directory. |
400 | */ |
401 | if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL)) |
402 | exit(1); |
403 | |
404 | /* |
405 | * Figure out where to start streaming. |
406 | */ |
407 | stream.startpos = FindStreamingStart(&stream.timeline); |
408 | if (stream.startpos == InvalidXLogRecPtr) |
409 | { |
410 | stream.startpos = serverpos; |
411 | stream.timeline = servertli; |
412 | } |
413 | |
414 | /* |
415 | * Always start streaming at the beginning of a segment |
416 | */ |
417 | stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz); |
418 | |
419 | /* |
420 | * Start the replication |
421 | */ |
422 | if (verbose) |
423 | pg_log_info("starting log streaming at %X/%X (timeline %u)" , |
424 | (uint32) (stream.startpos >> 32), (uint32) stream.startpos, |
425 | stream.timeline); |
426 | |
427 | stream.stream_stop = stop_streaming; |
428 | stream.stop_socket = PGINVALID_SOCKET; |
429 | stream.standby_message_timeout = standby_message_timeout; |
430 | stream.synchronous = synchronous; |
431 | stream.do_sync = do_sync; |
432 | stream.mark_done = false; |
433 | stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel, |
434 | stream.do_sync); |
435 | stream.partial_suffix = ".partial" ; |
436 | stream.replication_slot = replication_slot; |
437 | |
438 | ReceiveXlogStream(conn, &stream); |
439 | |
440 | if (!stream.walmethod->finish()) |
441 | { |
442 | pg_log_info("could not finish writing WAL files: %m" ); |
443 | return; |
444 | } |
445 | |
446 | PQfinish(conn); |
447 | conn = NULL; |
448 | |
449 | FreeWalDirectoryMethod(); |
450 | pg_free(stream.walmethod); |
451 | |
452 | conn = NULL; |
453 | } |
454 | |
455 | /* |
456 | * When sigint is called, just tell the system to exit at the next possible |
457 | * moment. |
458 | */ |
459 | #ifndef WIN32 |
460 | |
461 | static void |
462 | sigint_handler(int signum) |
463 | { |
464 | time_to_stop = true; |
465 | } |
466 | #endif |
467 | |
468 | int |
469 | main(int argc, char **argv) |
470 | { |
471 | static struct option long_options[] = { |
472 | {"help" , no_argument, NULL, '?'}, |
473 | {"version" , no_argument, NULL, 'V'}, |
474 | {"directory" , required_argument, NULL, 'D'}, |
475 | {"dbname" , required_argument, NULL, 'd'}, |
476 | {"endpos" , required_argument, NULL, 'E'}, |
477 | {"host" , required_argument, NULL, 'h'}, |
478 | {"port" , required_argument, NULL, 'p'}, |
479 | {"username" , required_argument, NULL, 'U'}, |
480 | {"no-loop" , no_argument, NULL, 'n'}, |
481 | {"no-password" , no_argument, NULL, 'w'}, |
482 | {"password" , no_argument, NULL, 'W'}, |
483 | {"status-interval" , required_argument, NULL, 's'}, |
484 | {"slot" , required_argument, NULL, 'S'}, |
485 | {"verbose" , no_argument, NULL, 'v'}, |
486 | {"compress" , required_argument, NULL, 'Z'}, |
487 | /* action */ |
488 | {"create-slot" , no_argument, NULL, 1}, |
489 | {"drop-slot" , no_argument, NULL, 2}, |
490 | {"if-not-exists" , no_argument, NULL, 3}, |
491 | {"synchronous" , no_argument, NULL, 4}, |
492 | {"no-sync" , no_argument, NULL, 5}, |
493 | {NULL, 0, NULL, 0} |
494 | }; |
495 | |
496 | int c; |
497 | int option_index; |
498 | char *db_name; |
499 | uint32 hi, |
500 | lo; |
501 | |
502 | pg_logging_init(argv[0]); |
503 | progname = get_progname(argv[0]); |
504 | set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup" )); |
505 | |
506 | if (argc > 1) |
507 | { |
508 | if (strcmp(argv[1], "--help" ) == 0 || strcmp(argv[1], "-?" ) == 0) |
509 | { |
510 | usage(); |
511 | exit(0); |
512 | } |
513 | else if (strcmp(argv[1], "-V" ) == 0 || |
514 | strcmp(argv[1], "--version" ) == 0) |
515 | { |
516 | puts("pg_receivewal (PostgreSQL) " PG_VERSION); |
517 | exit(0); |
518 | } |
519 | } |
520 | |
521 | while ((c = getopt_long(argc, argv, "D:d:E:h:p:U:s:S:nwWvZ:" , |
522 | long_options, &option_index)) != -1) |
523 | { |
524 | switch (c) |
525 | { |
526 | case 'D': |
527 | basedir = pg_strdup(optarg); |
528 | break; |
529 | case 'd': |
530 | connection_string = pg_strdup(optarg); |
531 | break; |
532 | case 'h': |
533 | dbhost = pg_strdup(optarg); |
534 | break; |
535 | case 'p': |
536 | if (atoi(optarg) <= 0) |
537 | { |
538 | pg_log_error("invalid port number \"%s\"" , optarg); |
539 | exit(1); |
540 | } |
541 | dbport = pg_strdup(optarg); |
542 | break; |
543 | case 'U': |
544 | dbuser = pg_strdup(optarg); |
545 | break; |
546 | case 'w': |
547 | dbgetpassword = -1; |
548 | break; |
549 | case 'W': |
550 | dbgetpassword = 1; |
551 | break; |
552 | case 's': |
553 | standby_message_timeout = atoi(optarg) * 1000; |
554 | if (standby_message_timeout < 0) |
555 | { |
556 | pg_log_error("invalid status interval \"%s\"" , optarg); |
557 | exit(1); |
558 | } |
559 | break; |
560 | case 'S': |
561 | replication_slot = pg_strdup(optarg); |
562 | break; |
563 | case 'E': |
564 | if (sscanf(optarg, "%X/%X" , &hi, &lo) != 2) |
565 | { |
566 | pg_log_error("could not parse end position \"%s\"" , optarg); |
567 | exit(1); |
568 | } |
569 | endpos = ((uint64) hi) << 32 | lo; |
570 | break; |
571 | case 'n': |
572 | noloop = 1; |
573 | break; |
574 | case 'v': |
575 | verbose++; |
576 | break; |
577 | case 'Z': |
578 | compresslevel = atoi(optarg); |
579 | if (compresslevel < 0 || compresslevel > 9) |
580 | { |
581 | pg_log_error("invalid compression level \"%s\"" , optarg); |
582 | exit(1); |
583 | } |
584 | break; |
585 | /* action */ |
586 | case 1: |
587 | do_create_slot = true; |
588 | break; |
589 | case 2: |
590 | do_drop_slot = true; |
591 | break; |
592 | case 3: |
593 | slot_exists_ok = true; |
594 | break; |
595 | case 4: |
596 | synchronous = true; |
597 | break; |
598 | case 5: |
599 | do_sync = false; |
600 | break; |
601 | default: |
602 | |
603 | /* |
604 | * getopt_long already emitted a complaint |
605 | */ |
606 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
607 | progname); |
608 | exit(1); |
609 | } |
610 | } |
611 | |
612 | /* |
613 | * Any non-option arguments? |
614 | */ |
615 | if (optind < argc) |
616 | { |
617 | pg_log_error("too many command-line arguments (first is \"%s\")" , |
618 | argv[optind]); |
619 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
620 | progname); |
621 | exit(1); |
622 | } |
623 | |
624 | if (do_drop_slot && do_create_slot) |
625 | { |
626 | pg_log_error("cannot use --create-slot together with --drop-slot" ); |
627 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
628 | progname); |
629 | exit(1); |
630 | } |
631 | |
632 | if (replication_slot == NULL && (do_drop_slot || do_create_slot)) |
633 | { |
634 | /* translator: second %s is an option name */ |
635 | pg_log_error("%s needs a slot to be specified using --slot" , |
636 | do_drop_slot ? "--drop-slot" : "--create-slot" ); |
637 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
638 | progname); |
639 | exit(1); |
640 | } |
641 | |
642 | if (synchronous && !do_sync) |
643 | { |
644 | pg_log_error("cannot use --synchronous together with --no-sync" ); |
645 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
646 | progname); |
647 | exit(1); |
648 | } |
649 | |
650 | /* |
651 | * Required arguments |
652 | */ |
653 | if (basedir == NULL && !do_drop_slot && !do_create_slot) |
654 | { |
655 | pg_log_error("no target directory specified" ); |
656 | fprintf(stderr, _("Try \"%s --help\" for more information.\n" ), |
657 | progname); |
658 | exit(1); |
659 | } |
660 | |
661 | #ifndef HAVE_LIBZ |
662 | if (compresslevel != 0) |
663 | { |
664 | pg_log_error("this build does not support compression" ); |
665 | exit(1); |
666 | } |
667 | #endif |
668 | |
669 | /* |
670 | * Check existence of destination folder. |
671 | */ |
672 | if (!do_drop_slot && !do_create_slot) |
673 | { |
674 | DIR *dir = get_destination_dir(basedir); |
675 | |
676 | close_destination_dir(dir, basedir); |
677 | } |
678 | |
679 | #ifndef WIN32 |
680 | pqsignal(SIGINT, sigint_handler); |
681 | #endif |
682 | |
683 | /* |
684 | * Obtain a connection before doing anything. |
685 | */ |
686 | conn = GetConnection(); |
687 | if (!conn) |
688 | /* error message already written in GetConnection() */ |
689 | exit(1); |
690 | atexit(disconnect_atexit); |
691 | |
692 | /* |
693 | * Run IDENTIFY_SYSTEM to make sure we've successfully have established a |
694 | * replication connection and haven't connected using a database specific |
695 | * connection. |
696 | */ |
697 | if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name)) |
698 | exit(1); |
699 | |
700 | /* |
701 | * Set umask so that directories/files are created with the same |
702 | * permissions as directories/files in the source data directory. |
703 | * |
704 | * pg_mode_mask is set to owner-only by default and then updated in |
705 | * GetConnection() where we get the mode from the server-side with |
706 | * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm(). |
707 | */ |
708 | umask(pg_mode_mask); |
709 | |
710 | /* determine remote server's xlog segment size */ |
711 | if (!RetrieveWalSegSize(conn)) |
712 | exit(1); |
713 | |
714 | /* |
715 | * Check that there is a database associated with connection, none should |
716 | * be defined in this context. |
717 | */ |
718 | if (db_name) |
719 | { |
720 | pg_log_error("replication connection using slot \"%s\" is unexpectedly database specific" , |
721 | replication_slot); |
722 | exit(1); |
723 | } |
724 | |
725 | /* |
726 | * Drop a replication slot. |
727 | */ |
728 | if (do_drop_slot) |
729 | { |
730 | if (verbose) |
731 | pg_log_info("dropping replication slot \"%s\"" , replication_slot); |
732 | |
733 | if (!DropReplicationSlot(conn, replication_slot)) |
734 | exit(1); |
735 | exit(0); |
736 | } |
737 | |
738 | /* Create a replication slot */ |
739 | if (do_create_slot) |
740 | { |
741 | if (verbose) |
742 | pg_log_info("creating replication slot \"%s\"" , replication_slot); |
743 | |
744 | if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false, |
745 | slot_exists_ok)) |
746 | exit(1); |
747 | exit(0); |
748 | } |
749 | |
750 | /* |
751 | * Don't close the connection here so that subsequent StreamLog() can |
752 | * reuse it. |
753 | */ |
754 | |
755 | while (true) |
756 | { |
757 | StreamLog(); |
758 | if (time_to_stop) |
759 | { |
760 | /* |
761 | * We've been Ctrl-C'ed or end of streaming position has been |
762 | * willingly reached, so exit without an error code. |
763 | */ |
764 | exit(0); |
765 | } |
766 | else if (noloop) |
767 | { |
768 | pg_log_error("disconnected" ); |
769 | exit(1); |
770 | } |
771 | else |
772 | { |
773 | /* translator: check source for value for %d */ |
774 | pg_log_info("disconnected; waiting %d seconds to try again" , |
775 | RECONNECT_SLEEP_TIME); |
776 | pg_usleep(RECONNECT_SLEEP_TIME * 1000000); |
777 | } |
778 | } |
779 | } |
780 | |