1/*-------------------------------------------------------------------------
2 *
3 * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 * fashion and write it to a local file.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_recvlogical.c
10 *-------------------------------------------------------------------------
11 */
12
13#include "postgres_fe.h"
14
15#include <dirent.h>
16#include <sys/stat.h>
17#include <unistd.h>
18#ifdef HAVE_SYS_SELECT_H
19#include <sys/select.h>
20#endif
21
22/* local includes */
23#include "streamutil.h"
24
25#include "access/xlog_internal.h"
26#include "common/file_perm.h"
27#include "common/fe_memutils.h"
28#include "common/logging.h"
29#include "getopt_long.h"
30#include "libpq-fe.h"
31#include "libpq/pqsignal.h"
32#include "pqexpbuffer.h"
33
34
35/* Time to sleep between reconnection attempts */
36#define RECONNECT_SLEEP_TIME 5
37
38/* Global Options */
39static char *outfile = NULL;
40static int verbose = 0;
41static int noloop = 0;
42static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
43static int fsync_interval = 10 * 1000; /* 10 sec = default */
44static XLogRecPtr startpos = InvalidXLogRecPtr;
45static XLogRecPtr endpos = InvalidXLogRecPtr;
46static bool do_create_slot = false;
47static bool slot_exists_ok = false;
48static bool do_start_slot = false;
49static bool do_drop_slot = false;
50static char *replication_slot = NULL;
51
52/* filled pairwise with option, value. value may be NULL */
53static char **options;
54static size_t noptions = 0;
55static const char *plugin = "test_decoding";
56
57/* Global State */
58static int outfd = -1;
59static volatile sig_atomic_t time_to_abort = false;
60static volatile sig_atomic_t output_reopen = false;
61static bool output_isfile;
62static TimestampTz output_last_fsync = -1;
63static bool output_needs_fsync = false;
64static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
65static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
66
67static void usage(void);
68static void StreamLogicalLog(void);
69static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
70static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
71 bool keepalive, XLogRecPtr lsn);
72
73static void
74usage(void)
75{
76 printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
77 progname);
78 printf(_("Usage:\n"));
79 printf(_(" %s [OPTION]...\n"), progname);
80 printf(_("\nAction to be performed:\n"));
81 printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
82 printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
83 printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
84 printf(_("\nOptions:\n"));
85 printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
86 printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
87 printf(_(" -F --fsync-interval=SECS\n"
88 " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
89 printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
90 printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
91 printf(_(" -n, --no-loop do not loop on connection lost\n"));
92 printf(_(" -o, --option=NAME[=VALUE]\n"
93 " pass option NAME with optional value VALUE to the\n"
94 " output plugin\n"));
95 printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
96 printf(_(" -s, --status-interval=SECS\n"
97 " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
98 printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
99 printf(_(" -v, --verbose output verbose messages\n"));
100 printf(_(" -V, --version output version information, then exit\n"));
101 printf(_(" -?, --help show this help, then exit\n"));
102 printf(_("\nConnection options:\n"));
103 printf(_(" -d, --dbname=DBNAME database to connect to\n"));
104 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
105 printf(_(" -p, --port=PORT database server port number\n"));
106 printf(_(" -U, --username=NAME connect as specified database user\n"));
107 printf(_(" -w, --no-password never prompt for password\n"));
108 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
109 printf(_("\nReport bugs to <pgsql-bugs@lists.postgresql.org>.\n"));
110}
111
112/*
113 * Send a Standby Status Update message to server.
114 */
115static bool
116sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
117{
118 static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
119 static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
120
121 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
122 int len = 0;
123
124 /*
125 * we normally don't want to send superfluous feedback, but if it's
126 * because of a timeout we need to, otherwise wal_sender_timeout will kill
127 * us.
128 */
129 if (!force &&
130 last_written_lsn == output_written_lsn &&
131 last_fsync_lsn != output_fsync_lsn)
132 return true;
133
134 if (verbose)
135 pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
136 (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
137 (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
138 replication_slot);
139
140 replybuf[len] = 'r';
141 len += 1;
142 fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
143 len += 8;
144 fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
145 len += 8;
146 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
147 len += 8;
148 fe_sendint64(now, &replybuf[len]); /* sendTime */
149 len += 8;
150 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
151 len += 1;
152
153 startpos = output_written_lsn;
154 last_written_lsn = output_written_lsn;
155 last_fsync_lsn = output_fsync_lsn;
156
157 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
158 {
159 pg_log_error("could not send feedback packet: %s",
160 PQerrorMessage(conn));
161 return false;
162 }
163
164 return true;
165}
166
167static void
168disconnect_atexit(void)
169{
170 if (conn != NULL)
171 PQfinish(conn);
172}
173
174static bool
175OutputFsync(TimestampTz now)
176{
177 output_last_fsync = now;
178
179 output_fsync_lsn = output_written_lsn;
180
181 if (fsync_interval <= 0)
182 return true;
183
184 if (!output_needs_fsync)
185 return true;
186
187 output_needs_fsync = false;
188
189 /* can only fsync if it's a regular file */
190 if (!output_isfile)
191 return true;
192
193 if (fsync(outfd) != 0)
194 {
195 pg_log_error("could not fsync file \"%s\": %m", outfile);
196 return false;
197 }
198
199 return true;
200}
201
202/*
203 * Start the log streaming
204 */
205static void
206StreamLogicalLog(void)
207{
208 PGresult *res;
209 char *copybuf = NULL;
210 TimestampTz last_status = -1;
211 int i;
212 PQExpBuffer query;
213
214 output_written_lsn = InvalidXLogRecPtr;
215 output_fsync_lsn = InvalidXLogRecPtr;
216
217 query = createPQExpBuffer();
218
219 /*
220 * Connect in replication mode to the server
221 */
222 if (!conn)
223 conn = GetConnection();
224 if (!conn)
225 /* Error message already written in GetConnection() */
226 return;
227
228 /*
229 * Start the replication
230 */
231 if (verbose)
232 pg_log_info("starting log streaming at %X/%X (slot %s)",
233 (uint32) (startpos >> 32), (uint32) startpos,
234 replication_slot);
235
236 /* Initiate the replication stream at specified location */
237 appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
238 replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
239
240 /* print options if there are any */
241 if (noptions)
242 appendPQExpBufferStr(query, " (");
243
244 for (i = 0; i < noptions; i++)
245 {
246 /* separator */
247 if (i > 0)
248 appendPQExpBufferStr(query, ", ");
249
250 /* write option name */
251 appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
252
253 /* write option value if specified */
254 if (options[(i * 2) + 1] != NULL)
255 appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
256 }
257
258 if (noptions)
259 appendPQExpBufferChar(query, ')');
260
261 res = PQexec(conn, query->data);
262 if (PQresultStatus(res) != PGRES_COPY_BOTH)
263 {
264 pg_log_error("could not send replication command \"%s\": %s",
265 query->data, PQresultErrorMessage(res));
266 PQclear(res);
267 goto error;
268 }
269 PQclear(res);
270 resetPQExpBuffer(query);
271
272 if (verbose)
273 pg_log_info("streaming initiated");
274
275 while (!time_to_abort)
276 {
277 int r;
278 int bytes_left;
279 int bytes_written;
280 TimestampTz now;
281 int hdr_len;
282 XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
283
284 if (copybuf != NULL)
285 {
286 PQfreemem(copybuf);
287 copybuf = NULL;
288 }
289
290 /*
291 * Potentially send a status message to the master
292 */
293 now = feGetCurrentTimestamp();
294
295 if (outfd != -1 &&
296 feTimestampDifferenceExceeds(output_last_fsync, now,
297 fsync_interval))
298 {
299 if (!OutputFsync(now))
300 goto error;
301 }
302
303 if (standby_message_timeout > 0 &&
304 feTimestampDifferenceExceeds(last_status, now,
305 standby_message_timeout))
306 {
307 /* Time to send feedback! */
308 if (!sendFeedback(conn, now, true, false))
309 goto error;
310
311 last_status = now;
312 }
313
314 /* got SIGHUP, close output file */
315 if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
316 {
317 now = feGetCurrentTimestamp();
318 if (!OutputFsync(now))
319 goto error;
320 close(outfd);
321 outfd = -1;
322 }
323 output_reopen = false;
324
325 /* open the output file, if not open yet */
326 if (outfd == -1)
327 {
328 struct stat statbuf;
329
330 if (strcmp(outfile, "-") == 0)
331 outfd = fileno(stdout);
332 else
333 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
334 S_IRUSR | S_IWUSR);
335 if (outfd == -1)
336 {
337 pg_log_error("could not open log file \"%s\": %m", outfile);
338 goto error;
339 }
340
341 if (fstat(outfd, &statbuf) != 0)
342 pg_log_error("could not stat file \"%s\": %m", outfile);
343
344 output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
345 }
346
347 r = PQgetCopyData(conn, &copybuf, 1);
348 if (r == 0)
349 {
350 /*
351 * In async mode, and no data available. We block on reading but
352 * not more than the specified timeout, so that we can send a
353 * response back to the client.
354 */
355 fd_set input_mask;
356 TimestampTz message_target = 0;
357 TimestampTz fsync_target = 0;
358 struct timeval timeout;
359 struct timeval *timeoutptr = NULL;
360
361 if (PQsocket(conn) < 0)
362 {
363 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
364 goto error;
365 }
366
367 FD_ZERO(&input_mask);
368 FD_SET(PQsocket(conn), &input_mask);
369
370 /* Compute when we need to wakeup to send a keepalive message. */
371 if (standby_message_timeout)
372 message_target = last_status + (standby_message_timeout - 1) *
373 ((int64) 1000);
374
375 /* Compute when we need to wakeup to fsync the output file. */
376 if (fsync_interval > 0 && output_needs_fsync)
377 fsync_target = output_last_fsync + (fsync_interval - 1) *
378 ((int64) 1000);
379
380 /* Now compute when to wakeup. */
381 if (message_target > 0 || fsync_target > 0)
382 {
383 TimestampTz targettime;
384 long secs;
385 int usecs;
386
387 targettime = message_target;
388
389 if (fsync_target > 0 && fsync_target < targettime)
390 targettime = fsync_target;
391
392 feTimestampDifference(now,
393 targettime,
394 &secs,
395 &usecs);
396 if (secs <= 0)
397 timeout.tv_sec = 1; /* Always sleep at least 1 sec */
398 else
399 timeout.tv_sec = secs;
400 timeout.tv_usec = usecs;
401 timeoutptr = &timeout;
402 }
403
404 r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
405 if (r == 0 || (r < 0 && errno == EINTR))
406 {
407 /*
408 * Got a timeout or signal. Continue the loop and either
409 * deliver a status packet to the server or just go back into
410 * blocking.
411 */
412 continue;
413 }
414 else if (r < 0)
415 {
416 pg_log_error("select() failed: %m");
417 goto error;
418 }
419
420 /* Else there is actually data on the socket */
421 if (PQconsumeInput(conn) == 0)
422 {
423 pg_log_error("could not receive data from WAL stream: %s",
424 PQerrorMessage(conn));
425 goto error;
426 }
427 continue;
428 }
429
430 /* End of copy stream */
431 if (r == -1)
432 break;
433
434 /* Failure while reading the copy stream */
435 if (r == -2)
436 {
437 pg_log_error("could not read COPY data: %s",
438 PQerrorMessage(conn));
439 goto error;
440 }
441
442 /* Check the message type. */
443 if (copybuf[0] == 'k')
444 {
445 int pos;
446 bool replyRequested;
447 XLogRecPtr walEnd;
448 bool endposReached = false;
449
450 /*
451 * Parse the keepalive message, enclosed in the CopyData message.
452 * We just check if the server requested a reply, and ignore the
453 * rest.
454 */
455 pos = 1; /* skip msgtype 'k' */
456 walEnd = fe_recvint64(&copybuf[pos]);
457 output_written_lsn = Max(walEnd, output_written_lsn);
458
459 pos += 8; /* read walEnd */
460
461 pos += 8; /* skip sendTime */
462
463 if (r < pos + 1)
464 {
465 pg_log_error("streaming header too small: %d", r);
466 goto error;
467 }
468 replyRequested = copybuf[pos];
469
470 if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
471 {
472 /*
473 * If there's nothing to read on the socket until a keepalive
474 * we know that the server has nothing to send us; and if
475 * walEnd has passed endpos, we know nothing else can have
476 * committed before endpos. So we can bail out now.
477 */
478 endposReached = true;
479 }
480
481 /* Send a reply, if necessary */
482 if (replyRequested || endposReached)
483 {
484 if (!flushAndSendFeedback(conn, &now))
485 goto error;
486 last_status = now;
487 }
488
489 if (endposReached)
490 {
491 prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
492 time_to_abort = true;
493 break;
494 }
495
496 continue;
497 }
498 else if (copybuf[0] != 'w')
499 {
500 pg_log_error("unrecognized streaming header: \"%c\"",
501 copybuf[0]);
502 goto error;
503 }
504
505 /*
506 * Read the header of the XLogData message, enclosed in the CopyData
507 * message. We only need the WAL location field (dataStart), the rest
508 * of the header is ignored.
509 */
510 hdr_len = 1; /* msgtype 'w' */
511 hdr_len += 8; /* dataStart */
512 hdr_len += 8; /* walEnd */
513 hdr_len += 8; /* sendTime */
514 if (r < hdr_len + 1)
515 {
516 pg_log_error("streaming header too small: %d", r);
517 goto error;
518 }
519
520 /* Extract WAL location for this block */
521 cur_record_lsn = fe_recvint64(&copybuf[1]);
522
523 if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
524 {
525 /*
526 * We've read past our endpoint, so prepare to go away being
527 * cautious about what happens to our output data.
528 */
529 if (!flushAndSendFeedback(conn, &now))
530 goto error;
531 prepareToTerminate(conn, endpos, false, cur_record_lsn);
532 time_to_abort = true;
533 break;
534 }
535
536 output_written_lsn = Max(cur_record_lsn, output_written_lsn);
537
538 bytes_left = r - hdr_len;
539 bytes_written = 0;
540
541 /* signal that a fsync is needed */
542 output_needs_fsync = true;
543
544 while (bytes_left)
545 {
546 int ret;
547
548 ret = write(outfd,
549 copybuf + hdr_len + bytes_written,
550 bytes_left);
551
552 if (ret < 0)
553 {
554 pg_log_error("could not write %u bytes to log file \"%s\": %m",
555 bytes_left, outfile);
556 goto error;
557 }
558
559 /* Write was successful, advance our position */
560 bytes_written += ret;
561 bytes_left -= ret;
562 }
563
564 if (write(outfd, "\n", 1) != 1)
565 {
566 pg_log_error("could not write %u bytes to log file \"%s\": %m",
567 1, outfile);
568 goto error;
569 }
570
571 if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
572 {
573 /* endpos was exactly the record we just processed, we're done */
574 if (!flushAndSendFeedback(conn, &now))
575 goto error;
576 prepareToTerminate(conn, endpos, false, cur_record_lsn);
577 time_to_abort = true;
578 break;
579 }
580 }
581
582 res = PQgetResult(conn);
583 if (PQresultStatus(res) == PGRES_COPY_OUT)
584 {
585 /*
586 * We're doing a client-initiated clean exit and have sent CopyDone to
587 * the server. We've already sent replay confirmation and fsync'd so
588 * we can just clean up the connection now.
589 */
590 goto error;
591 }
592 else if (PQresultStatus(res) != PGRES_COMMAND_OK)
593 {
594 pg_log_error("unexpected termination of replication stream: %s",
595 PQresultErrorMessage(res));
596 goto error;
597 }
598 PQclear(res);
599
600 if (outfd != -1 && strcmp(outfile, "-") != 0)
601 {
602 TimestampTz t = feGetCurrentTimestamp();
603
604 /* no need to jump to error on failure here, we're finishing anyway */
605 OutputFsync(t);
606
607 if (close(outfd) != 0)
608 pg_log_error("could not close file \"%s\": %m", outfile);
609 }
610 outfd = -1;
611error:
612 if (copybuf != NULL)
613 {
614 PQfreemem(copybuf);
615 copybuf = NULL;
616 }
617 destroyPQExpBuffer(query);
618 PQfinish(conn);
619 conn = NULL;
620}
621
622/*
623 * Unfortunately we can't do sensible signal handling on windows...
624 */
625#ifndef WIN32
626
627/*
628 * When sigint is called, just tell the system to exit at the next possible
629 * moment.
630 */
631static void
632sigint_handler(int signum)
633{
634 time_to_abort = true;
635}
636
637/*
638 * Trigger the output file to be reopened.
639 */
640static void
641sighup_handler(int signum)
642{
643 output_reopen = true;
644}
645#endif
646
647
648int
649main(int argc, char **argv)
650{
651 static struct option long_options[] = {
652/* general options */
653 {"file", required_argument, NULL, 'f'},
654 {"fsync-interval", required_argument, NULL, 'F'},
655 {"no-loop", no_argument, NULL, 'n'},
656 {"verbose", no_argument, NULL, 'v'},
657 {"version", no_argument, NULL, 'V'},
658 {"help", no_argument, NULL, '?'},
659/* connection options */
660 {"dbname", required_argument, NULL, 'd'},
661 {"host", required_argument, NULL, 'h'},
662 {"port", required_argument, NULL, 'p'},
663 {"username", required_argument, NULL, 'U'},
664 {"no-password", no_argument, NULL, 'w'},
665 {"password", no_argument, NULL, 'W'},
666/* replication options */
667 {"startpos", required_argument, NULL, 'I'},
668 {"endpos", required_argument, NULL, 'E'},
669 {"option", required_argument, NULL, 'o'},
670 {"plugin", required_argument, NULL, 'P'},
671 {"status-interval", required_argument, NULL, 's'},
672 {"slot", required_argument, NULL, 'S'},
673/* action */
674 {"create-slot", no_argument, NULL, 1},
675 {"start", no_argument, NULL, 2},
676 {"drop-slot", no_argument, NULL, 3},
677 {"if-not-exists", no_argument, NULL, 4},
678 {NULL, 0, NULL, 0}
679 };
680 int c;
681 int option_index;
682 uint32 hi,
683 lo;
684 char *db_name;
685
686 pg_logging_init(argv[0]);
687 progname = get_progname(argv[0]);
688 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
689
690 if (argc > 1)
691 {
692 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
693 {
694 usage();
695 exit(0);
696 }
697 else if (strcmp(argv[1], "-V") == 0 ||
698 strcmp(argv[1], "--version") == 0)
699 {
700 puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
701 exit(0);
702 }
703 }
704
705 while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
706 long_options, &option_index)) != -1)
707 {
708 switch (c)
709 {
710/* general options */
711 case 'f':
712 outfile = pg_strdup(optarg);
713 break;
714 case 'F':
715 fsync_interval = atoi(optarg) * 1000;
716 if (fsync_interval < 0)
717 {
718 pg_log_error("invalid fsync interval \"%s\"", optarg);
719 exit(1);
720 }
721 break;
722 case 'n':
723 noloop = 1;
724 break;
725 case 'v':
726 verbose++;
727 break;
728/* connection options */
729 case 'd':
730 dbname = pg_strdup(optarg);
731 break;
732 case 'h':
733 dbhost = pg_strdup(optarg);
734 break;
735 case 'p':
736 if (atoi(optarg) <= 0)
737 {
738 pg_log_error("invalid port number \"%s\"", optarg);
739 exit(1);
740 }
741 dbport = pg_strdup(optarg);
742 break;
743 case 'U':
744 dbuser = pg_strdup(optarg);
745 break;
746 case 'w':
747 dbgetpassword = -1;
748 break;
749 case 'W':
750 dbgetpassword = 1;
751 break;
752/* replication options */
753 case 'I':
754 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
755 {
756 pg_log_error("could not parse start position \"%s\"", optarg);
757 exit(1);
758 }
759 startpos = ((uint64) hi) << 32 | lo;
760 break;
761 case 'E':
762 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
763 {
764 pg_log_error("could not parse end position \"%s\"", optarg);
765 exit(1);
766 }
767 endpos = ((uint64) hi) << 32 | lo;
768 break;
769 case 'o':
770 {
771 char *data = pg_strdup(optarg);
772 char *val = strchr(data, '=');
773
774 if (val != NULL)
775 {
776 /* remove =; separate data from val */
777 *val = '\0';
778 val++;
779 }
780
781 noptions += 1;
782 options = pg_realloc(options, sizeof(char *) * noptions * 2);
783
784 options[(noptions - 1) * 2] = data;
785 options[(noptions - 1) * 2 + 1] = val;
786 }
787
788 break;
789 case 'P':
790 plugin = pg_strdup(optarg);
791 break;
792 case 's':
793 standby_message_timeout = atoi(optarg) * 1000;
794 if (standby_message_timeout < 0)
795 {
796 pg_log_error("invalid status interval \"%s\"", optarg);
797 exit(1);
798 }
799 break;
800 case 'S':
801 replication_slot = pg_strdup(optarg);
802 break;
803/* action */
804 case 1:
805 do_create_slot = true;
806 break;
807 case 2:
808 do_start_slot = true;
809 break;
810 case 3:
811 do_drop_slot = true;
812 break;
813 case 4:
814 slot_exists_ok = true;
815 break;
816
817 default:
818
819 /*
820 * getopt_long already emitted a complaint
821 */
822 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
823 progname);
824 exit(1);
825 }
826 }
827
828 /*
829 * Any non-option arguments?
830 */
831 if (optind < argc)
832 {
833 pg_log_error("too many command-line arguments (first is \"%s\")",
834 argv[optind]);
835 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
836 progname);
837 exit(1);
838 }
839
840 /*
841 * Required arguments
842 */
843 if (replication_slot == NULL)
844 {
845 pg_log_error("no slot specified");
846 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
847 progname);
848 exit(1);
849 }
850
851 if (do_start_slot && outfile == NULL)
852 {
853 pg_log_error("no target file specified");
854 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
855 progname);
856 exit(1);
857 }
858
859 if (!do_drop_slot && dbname == NULL)
860 {
861 pg_log_error("no database specified");
862 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
863 progname);
864 exit(1);
865 }
866
867 if (!do_drop_slot && !do_create_slot && !do_start_slot)
868 {
869 pg_log_error("at least one action needs to be specified");
870 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
871 progname);
872 exit(1);
873 }
874
875 if (do_drop_slot && (do_create_slot || do_start_slot))
876 {
877 pg_log_error("cannot use --create-slot or --start together with --drop-slot");
878 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
879 progname);
880 exit(1);
881 }
882
883 if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
884 {
885 pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
886 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
887 progname);
888 exit(1);
889 }
890
891 if (endpos != InvalidXLogRecPtr && !do_start_slot)
892 {
893 pg_log_error("--endpos may only be specified with --start");
894 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
895 progname);
896 exit(1);
897 }
898
899#ifndef WIN32
900 pqsignal(SIGINT, sigint_handler);
901 pqsignal(SIGHUP, sighup_handler);
902#endif
903
904 /*
905 * Obtain a connection to server. This is not really necessary but it
906 * helps to get more precise error messages about authentication, required
907 * GUC parameters and such.
908 */
909 conn = GetConnection();
910 if (!conn)
911 /* Error message already written in GetConnection() */
912 exit(1);
913 atexit(disconnect_atexit);
914
915 /*
916 * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
917 * replication connection.
918 */
919 if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
920 exit(1);
921
922 if (db_name == NULL)
923 {
924 pg_log_error("could not establish database-specific replication connection");
925 exit(1);
926 }
927
928 /*
929 * Set umask so that directories/files are created with the same
930 * permissions as directories/files in the source data directory.
931 *
932 * pg_mode_mask is set to owner-only by default and then updated in
933 * GetConnection() where we get the mode from the server-side with
934 * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
935 */
936 umask(pg_mode_mask);
937
938 /* Drop a replication slot. */
939 if (do_drop_slot)
940 {
941 if (verbose)
942 pg_log_info("dropping replication slot \"%s\"", replication_slot);
943
944 if (!DropReplicationSlot(conn, replication_slot))
945 exit(1);
946 }
947
948 /* Create a replication slot. */
949 if (do_create_slot)
950 {
951 if (verbose)
952 pg_log_info("creating replication slot \"%s\"", replication_slot);
953
954 if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
955 false, false, slot_exists_ok))
956 exit(1);
957 startpos = InvalidXLogRecPtr;
958 }
959
960 if (!do_start_slot)
961 exit(0);
962
963 /* Stream loop */
964 while (true)
965 {
966 StreamLogicalLog();
967 if (time_to_abort)
968 {
969 /*
970 * We've been Ctrl-C'ed or reached an exit limit condition. That's
971 * not an error, so exit without an errorcode.
972 */
973 exit(0);
974 }
975 else if (noloop)
976 {
977 pg_log_error("disconnected");
978 exit(1);
979 }
980 else
981 {
982 /* translator: check source for value for %d */
983 pg_log_info("disconnected; waiting %d seconds to try again",
984 RECONNECT_SLEEP_TIME);
985 pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
986 }
987 }
988}
989
990/*
991 * Fsync our output data, and send a feedback message to the server. Returns
992 * true if successful, false otherwise.
993 *
994 * If successful, *now is updated to the current timestamp just before sending
995 * feedback.
996 */
997static bool
998flushAndSendFeedback(PGconn *conn, TimestampTz *now)
999{
1000 /* flush data to disk, so that we send a recent flush pointer */
1001 if (!OutputFsync(*now))
1002 return false;
1003 *now = feGetCurrentTimestamp();
1004 if (!sendFeedback(conn, *now, true, false))
1005 return false;
1006
1007 return true;
1008}
1009
1010/*
1011 * Try to inform the server about our upcoming demise, but don't wait around or
1012 * retry on failure.
1013 */
1014static void
1015prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
1016{
1017 (void) PQputCopyEnd(conn, NULL);
1018 (void) PQflush(conn);
1019
1020 if (verbose)
1021 {
1022 if (keepalive)
1023 pg_log_info("end position %X/%X reached by keepalive",
1024 (uint32) (endpos >> 32), (uint32) endpos);
1025 else
1026 pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1027 (uint32) (endpos >> 32), (uint32) (endpos),
1028 (uint32) (lsn >> 32), (uint32) lsn);
1029 }
1030}
1031